diff --git a/src/bin/pg_probackup/parsexlog.cpp b/src/bin/pg_probackup/parsexlog.cpp index c03d3e82c71254cb84acb83043ab384710eba3d9..d81628a2761c5458f2773c553b1d7a632b937855 100644 --- a/src/bin/pg_probackup/parsexlog.cpp +++ b/src/bin/pg_probackup/parsexlog.cpp @@ -1975,7 +1975,11 @@ bool XLogRecGetBlockTag(XLogReaderState* record, uint8 block_id, RelFileNode* rn if (pblk != NULL) { pblk->relNode = bkpb->seg_fileno; pblk->block = bkpb->seg_blockno; +#ifdef ENABLE_SS_MULTIMASTER + pblk->lsn = record->logicLSN; +#else pblk->lsn = record->EndRecPtr; +#endif } return true; } diff --git a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp index ac58f583c4ad79856d45f764626d27e16e644cca..7fb315484065a70382dfa30dabbf3d6dda908336 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_recovery.cpp @@ -316,7 +316,11 @@ XLogRecPtr SSOndemandRequestPrimaryCkptAndGetRedoLsn() if (SS_ONDEMAND_REALTIME_BUILD_NORMAL) { dms_context_t dms_ctx; InitDmsContext(&dms_ctx); +#ifdef ENABLE_SS_MULTIMASTER + dms_ctx.xmap_ctx.dest_id = (unsigned int)SS_ONDEMAND_REALTIME_RECOVERY_ID; +#else dms_ctx.xmap_ctx.dest_id = (unsigned int)SS_PRIMARY_ID; +#endif if (dms_req_opengauss_immediate_checkpoint(&dms_ctx, (unsigned long long *)&primaryRedoLsn) == GS_SUCCESS) { ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[On-demand] request primary node %d checkpoint success, redoLoc %X/%X", SS_PRIMARY_ID, @@ -328,7 +332,11 @@ XLogRecPtr SSOndemandRequestPrimaryCkptAndGetRedoLsn() } // read from DMS failed, so read from DSS +#ifdef ENABLE_SS_MULTIMASTER + SSReadControlFile(SS_ONDEMAND_REALTIME_RECOVERY_ID, true); +#else SSReadControlFile(SS_PRIMARY_ID, true); +#endif primaryRedoLsn = g_instance.dms_cxt.ckptRedo; ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[On-demand] read primary node %d checkpoint loc in control file, redoLoc %X/%X", SS_PRIMARY_ID, diff --git a/src/gausskernel/ddes/adapter/ss_reform_common.cpp b/src/gausskernel/ddes/adapter/ss_reform_common.cpp index a9ad4fcec504367b067912c114952cbd838beb9d..4b945c497ee14e2ed3ee964dfc5c6e0949dc9bb2 100644 --- a/src/gausskernel/ddes/adapter/ss_reform_common.cpp +++ b/src/gausskernel/ddes/adapter/ss_reform_common.cpp @@ -299,7 +299,7 @@ void SSUpdateReformerCtrl() } } -void SSReadControlFile(int id, bool updateDmsCtx) +void SSReadControlFile(int id, bool updateDmsCtx, ControlFileData* temp) { pg_crc32c crc; errno_t rc = EOK; @@ -361,7 +361,9 @@ loop: } else { ControlFileData* controlFile = NULL; ControlFileData tempControlFile; - if (updateDmsCtx) { + if (temp != NULL) { + controlFile = temp; + } else if (updateDmsCtx) { controlFile = &tempControlFile; } else { controlFile = t_thrd.shemem_ptr_cxt.ControlFile; @@ -391,7 +393,7 @@ loop: } } - if (XLByteLE(g_instance.dms_cxt.ckptRedo, controlFile->checkPointCopy.redo)) { + if (temp == NULL && XLByteLE(g_instance.dms_cxt.ckptRedo, controlFile->checkPointCopy.redo)) { g_instance.dms_cxt.ckptRedo = controlFile->checkPointCopy.redo; } } @@ -513,4 +515,4 @@ bool SSPerformingStandbyScenario() } } return false; -} \ No newline at end of file +} diff --git a/src/gausskernel/optimizer/commands/sequence/sequence_util.cpp b/src/gausskernel/optimizer/commands/sequence/sequence_util.cpp index 352fa494cef851cc26b7e2a2b24db5c50a85dafa..d3c095982ec3f002ab59ec75014fa75d75f32c51 100644 --- a/src/gausskernel/optimizer/commands/sequence/sequence_util.cpp +++ b/src/gausskernel/optimizer/commands/sequence/sequence_util.cpp @@ -52,7 +52,7 @@ void fill_seq_with_data(Relation rel, HeapTuple tuple) sm->magic = SEQ_MAGIC; phdr = (HeapPageHeader)page; -#ifdef ENABLE_SS_MULTIWRITE +#ifdef ENABLE_SS_MULTIMASTER phdr->pd_xid_base = u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId; #else phdr->pd_xid_base = u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; diff --git a/src/gausskernel/storage/access/hash/hash_xlog.cpp b/src/gausskernel/storage/access/hash/hash_xlog.cpp index 922e007b5978490dd0ee17137b4f2cfb0dab83f1..48cc7355170454ea6871a1eda647222e8291b957 100644 --- a/src/gausskernel/storage/access/hash/hash_xlog.cpp +++ b/src/gausskernel/storage/access/hash/hash_xlog.cpp @@ -323,7 +323,11 @@ static void hash_xlog_split_complete(XLogReaderState *record) */ static void hash_xlog_move_page_contents(XLogReaderState *record) { +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr lsn = record->logicLSN; +#else XLogRecPtr lsn = record->EndRecPtr; +#endif xl_hash_move_page_contents *xldata = (xl_hash_move_page_contents *) XLogRecGetData(record); RedoBufferInfo bucketbuf; RedoBufferInfo writebuf; @@ -403,7 +407,11 @@ static void hash_xlog_move_page_contents(XLogReaderState *record) */ static void hash_xlog_squeeze_page(XLogReaderState *record) { +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr lsn = record->logicLSN; +#else XLogRecPtr lsn = record->EndRecPtr; +#endif xl_hash_squeeze_page *xldata = (xl_hash_squeeze_page *) XLogRecGetData(record); RedoBufferInfo bucketbuf; RedoBufferInfo writebuf; @@ -529,7 +537,11 @@ static void hash_xlog_squeeze_page(XLogReaderState *record) */ static void hash_xlog_delete(XLogReaderState *record) { +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr lsn = record->logicLSN; +#else XLogRecPtr lsn = record->EndRecPtr; +#endif xl_hash_delete *xldata = (xl_hash_delete *) XLogRecGetData(record); RedoBufferInfo bucketbuf; RedoBufferInfo deletebuf; diff --git a/src/gausskernel/storage/access/heap/heapam.cpp b/src/gausskernel/storage/access/heap/heapam.cpp index aa850e2f2c192dc7b63d8900c9011ba5a5403290..333c41f9deecb7c748bd25a0203db99ef21220c2 100755 --- a/src/gausskernel/storage/access/heap/heapam.cpp +++ b/src/gausskernel/storage/access/heap/heapam.cpp @@ -9063,7 +9063,11 @@ void heap_bcm_redo(xl_heap_bcm* xlrec, RelFileNode node, XLogRecPtr lsn) */ static void heap_xlog_bcm(XLogReaderState* record) { +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr lsn = record->logicLSN; +#else XLogRecPtr lsn = record->EndRecPtr; +#endif xl_heap_bcm* xlrec = (xl_heap_bcm*)XLogRecGetData(record); RelFileNode tmp_node; @@ -9113,7 +9117,11 @@ static void heap_xlog_allvisiblecleared(XLogReaderState *record, int block_id) XLogPhyBlock pblk = { .relNode = vmfileno, .block = vmblock, +#ifdef ENABLE_SS_MULTIMASTER + .lsn = record->logicLSN +#else .lsn = record->EndRecPtr +#endif }; /* vm fork and main fork has the same RelFileNode */ vmbuffer = XLogReadBufferExtended(target_node, VISIBILITYMAP_FORKNUM, mapBlock, RBM_ZERO_ON_ERROR, &pblk); diff --git a/src/gausskernel/storage/access/nbtree/nbtxlog.cpp b/src/gausskernel/storage/access/nbtree/nbtxlog.cpp index b0277c08c4fd4353e1a3aee95c7fc2b4083bfc77..d6c7a16c36c2e339ac57d6e9f78bdac86de8b079 100755 --- a/src/gausskernel/storage/access/nbtree/nbtxlog.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtxlog.cpp @@ -357,7 +357,11 @@ static void btree_xlog_split(bool onleft, bool isroot, XLogReaderState *record, return; } +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr lsn = record->logicLSN; +#else XLogRecPtr lsn = record->EndRecPtr; +#endif xl_btree_split_posting *xlrec = (xl_btree_split_posting *)XLogRecGetData(record); bool isleaf = (xlrec->level == 0); RedoBufferInfo lbuf; @@ -567,7 +571,11 @@ static void btree_xlog_split(bool onleft, bool isroot, XLogReaderState *record, static void btree_xlog_dedup(XLogReaderState *record) { +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr lsn = record->logicLSN; +#else XLogRecPtr lsn = record->EndRecPtr; +#endif xl_btree_dedup *xlrec = (xl_btree_dedup *)XLogRecGetData(record); RedoBufferInfo redobuf; @@ -739,7 +747,11 @@ static void btree_xlog_delete(XLogReaderState *record) static void btree_xlog_delete_page(uint8 info, XLogReaderState *record) { +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr lsn = record->logicLSN; +#else XLogRecPtr lsn = record->EndRecPtr; +#endif xl_btree_delete_page *xlrec = (xl_btree_delete_page *)XLogRecGetData(record); RelFileNode rnode; BlockNumber parent; @@ -992,7 +1004,11 @@ static void btree_xlog_newroot(XLogReaderState *record, bool issplitupgrade) return; } +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr lsn = record->logicLSN; +#else XLogRecPtr lsn = record->EndRecPtr; +#endif xl_btree_newroot *xlrec = (xl_btree_newroot *)XLogRecGetData(record); RelFileNode rnode; Page page; diff --git a/src/gausskernel/storage/access/redo/redo_segpage.cpp b/src/gausskernel/storage/access/redo/redo_segpage.cpp index 2add2872b557438570a26c34f1cb01188e855c21..3b5e8d3edfcaffbdd4317b825aa702371c52f8d6 100644 --- a/src/gausskernel/storage/access/redo/redo_segpage.cpp +++ b/src/gausskernel/storage/access/redo/redo_segpage.cpp @@ -451,7 +451,11 @@ void SegPageRedoNewPage(XLogBlockHead *blockhead, XLogBlockSegNewPage *newPageIn Assert(newPageInfo->dataLen != 0); BufferTag *tag = (BufferTag *)newPageInfo->mainData; +#ifdef ENABLE_SS_MULTIMASTER + seg_redo_new_page_copy_and_flush(tag, newPageInfo->mainData + sizeof(BufferTag), blockhead->end_ptr, blockhead->logic_lsn); +#else seg_redo_new_page_copy_and_flush(tag, newPageInfo->mainData + sizeof(BufferTag), blockhead->end_ptr); +#endif } void MarkSegPageRedoChildPageDirty(RedoBufferInfo *bufferinfo) diff --git a/src/gausskernel/storage/access/redo/redo_sequence.cpp b/src/gausskernel/storage/access/redo/redo_sequence.cpp index a56c56ae3ea73caa87d6055457403dda5bf1e8f1..4e7574e558cb609c2f8d636a84f96d5b68a8ba1d 100644 --- a/src/gausskernel/storage/access/redo/redo_sequence.cpp +++ b/src/gausskernel/storage/access/redo/redo_sequence.cpp @@ -91,12 +91,7 @@ void seqRedoOperatorPage(RedoBufferInfo *buffer, void *itmedata, Size itemsz) sm->magic = SEQ_MAGIC; phdr = (HeapPageHeader)localpage; -#ifdef ENABLE_SS_MULTIMASTER - phdr->pd_xid_base = ENABLE_DMS ? u_sess->utils_cxt.CurrentSnapshot->g_oldestxmin - FirstNormalTransactionId - : u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#else phdr->pd_xid_base = u_sess->utils_cxt.RecentXmin - FirstNormalTransactionId; -#endif phdr->pd_multi_base = 0; if (PageAddItem(localpage, (Item)item, itemsz, FirstOffsetNumber, false, false) == InvalidOffsetNumber) diff --git a/src/gausskernel/storage/access/redo/redo_xlogutils.cpp b/src/gausskernel/storage/access/redo/redo_xlogutils.cpp index 5f4b047d145648424d97992202d9d22590923352..2aa458b122598b013f9fcbc193db21fa800192db 100644 --- a/src/gausskernel/storage/access/redo/redo_xlogutils.cpp +++ b/src/gausskernel/storage/access/redo/redo_xlogutils.cpp @@ -326,6 +326,12 @@ void XLogRecSetBlockCommonState(XLogReaderState *record, XLogBlockParseEnum bloc blockparse->blockhead.pblk.block = InvalidBlockNumber; blockparse->blockhead.pblk.lsn = InvalidXLogRecPtr; } + +#ifdef ENABLE_SS_MULTIMASTER + blockparse->blockhead.xlog_path = record->xlogPath; + blockparse->blockhead.instId = record->instId; + blockparse->blockhead.logic_lsn = record->logicLSN; +#endif } #ifdef USE_ASSERT_CHECKING @@ -421,7 +427,11 @@ void XLogRecSetBlockDataState(XLogReaderState *record, uint32 blockid, XLogRecPa pblk.block = decodebkp->seg_blockno; pblk.relNode = decodebkp->seg_fileno; +#ifdef ENABLE_SS_MULTIMASTER + pblk.lsn = record->logicLSN; +#else pblk.lsn = record->EndRecPtr; +#endif SegmentCheck(!XLOG_NEED_PHYSICAL_LOCATION(decodebkp->rnode) || PhyBlockIsValid(pblk)); RelFileNodeForkNum filenode = @@ -456,7 +466,11 @@ void XLogRecSetVmBlockState(XLogReaderState *record, uint32 blockid, XLogRecPars XLogPhyBlock pblk; pblk.relNode = vmFile; pblk.block = vmBlkNo; +#ifdef ENABLE_SS_MULTIMASTER + pblk.lsn = record->logicLSN; +#else pblk.lsn = record->EndRecPtr; +#endif SegmentCheck(!XLOG_NEED_PHYSICAL_LOCATION(rnode) || PhyBlockIsValid(pblk)); BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk); diff --git a/src/gausskernel/storage/access/redo/xlogreader_common.cpp b/src/gausskernel/storage/access/redo/xlogreader_common.cpp index 8dd8ae1eccbad5be67a8cf5ef419c38d88c12bb9..203dddcce4ed64bafc3f75e235c1542f27c18ae1 100644 --- a/src/gausskernel/storage/access/redo/xlogreader_common.cpp +++ b/src/gausskernel/storage/access/redo/xlogreader_common.cpp @@ -60,7 +60,11 @@ bool XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, RelFileNode *rn if (pblk != NULL) { pblk->relNode = bkpb->seg_fileno; pblk->block = bkpb->seg_blockno; +#ifdef ENABLE_SS_MULTIMASTER + pblk->lsn = record->logicLSN; +#else pblk->lsn = record->EndRecPtr; +#endif } return true; } diff --git a/src/gausskernel/storage/access/transam/extreme_rto/dispatcher.cpp b/src/gausskernel/storage/access/transam/extreme_rto/dispatcher.cpp index 8a52568781e2f8eb103a74f11047d18f12c7b3d6..d920a34ce4b1d04cc604b610e6a971eb4bd62a14 100755 --- a/src/gausskernel/storage/access/transam/extreme_rto/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/extreme_rto/dispatcher.cpp @@ -353,6 +353,7 @@ void AllocRecordReadBuffer(XLogReaderState *xlogreader, uint32 privateLen) #endif } +#ifndef ENABLE_SS_MULTIMASTER void SSAllocRecordReadBuffer(XLogReaderState *xlogreader, uint32 privateLen) { XLogReaderState *initreader; @@ -391,6 +392,60 @@ void SSAllocRecordReadBuffer(XLogReaderState *xlogreader, uint32 privateLen) InitLsnCheckCtl(xlogreader->ReadRecPtr); #endif } +#else +void SSAllocRecordReadBuffer(XLogReaderState **xlogReaderList, int instanceNum, uint32 privateLen) +{ + XLogReaderState *initreader; + XLogReaderState *xlogreader; + errno_t errorno = EOK; + g_dispatcher->rtoXlogBufState.mulReadBuf = (char **)palloc0(instanceNum * sizeof(char *)); + g_dispatcher->rtoXlogBufState.mulErrorMsgBuf = (char **)palloc0(instanceNum * sizeof(char *)); + g_dispatcher->rtoXlogBufState.mulReadPrivate = (void **)palloc0(instanceNum * sizeof(void *)); + g_dispatcher->rtoXlogBufState.mulInitReaders = (XLogReaderState **)palloc0(instanceNum * sizeof(XLogReaderState *)); + g_dispatcher->rtoXlogBufState.instanceNum = instanceNum; + + for (int i = 0; i < instanceNum; i++) { + g_dispatcher->rtoXlogBufState.mulReadBuf[i] = (char *)palloc0(XLOG_BLCKSZ); + g_dispatcher->rtoXlogBufState.mulErrorMsgBuf[i] = (char *)palloc0(MAX_ERRORMSG_LEN + 1); + g_dispatcher->rtoXlogBufState.mulReadPrivate[i] = (void *)palloc0(MAXALIGN(privateLen)); + } + + for (int i = 0; i < instanceNum; i++) { + xlogreader = xlogReaderList[i]; + initreader = GetXlogReader(xlogreader); + initreader->isPRProcess = true; + initreader->readBuf = g_dispatcher->rtoXlogBufState.mulReadBuf[i]; + errorno = memcpy_s(initreader->readBuf, XLOG_BLCKSZ, xlogreader->readBuf, xlogreader->readLen); + securec_check(errorno, "", ""); + initreader->errormsg_buf = g_dispatcher->rtoXlogBufState.mulErrorMsgBuf[i]; + initreader->private_data = g_dispatcher->rtoXlogBufState.mulReadPrivate[i]; + CopyDataFromOldReader(initreader, xlogreader); + g_dispatcher->rtoXlogBufState.mulInitReaders[i] = initreader; +#ifdef USE_ASSERT_CHECKING + InitLsnCheckCtl(xlogreader->ReadRecPtr); +#endif + } + g_dispatcher->rtoXlogBufState.readWorkerState = WORKER_STATE_STOP; + g_dispatcher->rtoXlogBufState.readPageWorkerState = WORKER_STATE_STOP; + g_dispatcher->rtoXlogBufState.readSource = 0; + g_dispatcher->rtoXlogBufState.failSource = 0; + g_dispatcher->rtoXlogBufState.xlogReadManagerState = READ_MANAGER_RUN; + g_dispatcher->rtoXlogBufState.targetRecPtr = InvalidXLogRecPtr; + g_dispatcher->rtoXlogBufState.expectLsn = InvalidXLogRecPtr; + g_dispatcher->rtoXlogBufState.waitRedoDone = 0; + g_dispatcher->rtoXlogBufState.readBuf = (char *)palloc0(XLOG_BLCKSZ); + g_dispatcher->rtoXlogBufState.readprivate = (void *)palloc0(MAXALIGN(privateLen)); + errorno = memset_s(g_dispatcher->rtoXlogBufState.readprivate, MAXALIGN(privateLen), 0, MAXALIGN(privateLen)); + securec_check(errorno, "", ""); + + g_dispatcher->rtoXlogBufState.errormsg_buf = (char *)palloc0(MAX_ERRORMSG_LEN + 1); + g_dispatcher->rtoXlogBufState.errormsg_buf[0] = '\0'; + + g_recordbuffer = &g_dispatcher->rtoXlogBufState; + g_startupTriggerState = TRIGGER_NORMAL; + g_readManagerTriggerFlag = TRIGGER_NORMAL; +} +#endif void StartupInterruptsForExtremeRto() { @@ -425,6 +480,7 @@ void StartupInterruptsForExtremeRto() } } +#ifndef ENABLE_SS_MULTIMASTER /* Run from the dispatcher thread. */ void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen) { @@ -466,6 +522,51 @@ void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen) close_readFile_if_open(); } } +#else + +void StartRecoveryWorkers(XLogReaderState *xlogreader, XLogReaderState **xlogReaderList, int instanceNum, uint32 privateLen) +{ + if (get_real_recovery_parallelism() > 1) { + if (t_thrd.xlog_cxt.StandbyModeRequested) { + ReLeaseRecoveryLatch(); + } + + CheckAlivePageWorkers(); + g_dispatcher = CreateDispatcher(); + g_dispatcher->oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.predo_cxt.parallelRedoCtx); + g_dispatcher->maxItemNum = (get_batch_redo_num() + 4) * PAGE_WORK_QUEUE_SIZE * + ITEM_QUQUE_SIZE_RATIO; // 4: a startup, readmanager, txnmanager, txnworker + /* alloc for record readbuf */ + if (ENABLE_DMS && ENABLE_DSS) { + SSAllocRecordReadBuffer(xlogReaderList, instanceNum, privateLen); + } else { + AllocRecordReadBuffer(xlogreader, privateLen); + } + StartPageRedoWorkers(get_real_recovery_parallelism()); + + ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), + errmsg("[PR]: max=%d, thrd=%d", g_instance.attr.attr_storage.max_recovery_parallelism, + get_real_recovery_parallelism()))); + WaitWorkerReady(); + SpinLockAcquire(&(g_instance.comm_cxt.predo_cxt.rwlock)); + g_instance.comm_cxt.predo_cxt.state = REDO_IN_PROGRESS; + SpinLockRelease(&(g_instance.comm_cxt.predo_cxt.rwlock)); + + Assert(g_instance.pid_cxt.exrto_recycler_pid == 0); + if (g_instance.attr.attr_storage.EnableHotStandby) { + g_instance.pid_cxt.exrto_recycler_pid = initialize_util_thread(EXRTO_RECYCLER); + } + + on_shmem_exit(StopRecoveryWorkers, 0); + + g_dispatcher->oldStartupIntrruptFunc = RegisterRedoInterruptCallBack(StartupInterruptsForExtremeRto); + + close_readFile_if_open(); + } +} + +#endif + void DumpDispatcher() { @@ -1806,6 +1907,13 @@ void InitReaderStateByOld(XLogReaderState *newState, XLogReaderState *oldState, newState->ReadRecPtr = oldState->ReadRecPtr; newState->EndRecPtr = oldState->EndRecPtr; +#ifdef ENABLE_SS_MULTIMASTER + newState->xlogPath = oldState->xlogPath; + newState->isEnd = oldState->isEnd; + newState->instId = oldState->instId; + newState->logicLSN = oldState->logicLSN; + newState->args = oldState->args; +#endif newState->readSegNo = oldState->readSegNo; newState->readOff = oldState->readOff; newState->readPageTLI = oldState->readPageTLI; diff --git a/src/gausskernel/storage/access/transam/extreme_rto/page_redo.cpp b/src/gausskernel/storage/access/transam/extreme_rto/page_redo.cpp index 83bead5545d4c53114e3fac3ccd0b37762295dc8..a4bdc091dbfe366ce44d948869fa189c8dcfb18d 100755 --- a/src/gausskernel/storage/access/transam/extreme_rto/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/extreme_rto/page_redo.cpp @@ -463,7 +463,11 @@ void RecordBlockCheck(void *rec, XLogRecPtr curPageLsn, uint32 blockId, bool rep record->decoded_record->xl_rmid, record->decoded_record->xl_info))); } else if (!(rmid == RM_HEAP2_ID && info == XLOG_HEAP2_VISIBLE) && !(rmid == RM_HEAP_ID && info == XLOG_HEAP_NEWPAGE)) { +#ifdef ENABLE_SS_MULTIMASTER + Assert(XLByteLE(record->logicLSN, curPageLsn)); +#else Assert(XLByteLE(record->EndRecPtr, curPageLsn)); +#endif } } @@ -1674,6 +1678,38 @@ void PutRecordToReadQueue(XLogReaderState *recordreader) SPSCBlockingQueuePut(g_dispatcher->readLine.readPageThd->queue, recordreader); } + +#ifdef ENABLE_SS_MULTIMASTER +static int get_min_xlogreader(XLogReaderState **xlogreaderList, int instanceNum) +{ + int minIndex = -1; + XLogRecord *record = NULL; + XLogRecPtr minLogicRecPtr = MAX_XLOG_REC_PTR; + for (int i = 0; i < instanceNum; i++) { + if (xlogreaderList[i]->isEnd) { + continue; + } + record = (XLogRecord *)xlogreaderList[i]->readRecordBuf; + if (record != NULL && record->logic_lsn < minLogicRecPtr) { + minIndex = i; + minLogicRecPtr = record->logic_lsn; + } + } + return minIndex; +} + +inline void InitXLogMulRecordReadBuffer(XLogReaderState **initReaderList, int *instanceNum) +{ + int minIndex = 0; + *instanceNum = g_dispatcher->rtoXlogBufState.instanceNum; + for (int i = 0; i < *instanceNum; i++) { + initReaderList[i] = g_dispatcher->rtoXlogBufState.mulInitReaders[i]; + g_dispatcher->rtoXlogBufState.mulInitReaders[i] = NULL; + } +} +#endif + + inline void InitXLogRecordReadBuffer(XLogReaderState **initreader) { XLogReaderState *newxlogreader; @@ -2112,12 +2148,22 @@ void XLogReadPageWorkerMain() { XLogReaderState *xlogreader = NULL; +#ifdef ENABLE_SS_MULTIMASTER + int instanceNum = 0; + XLogReaderState **xlogReaderList = NULL; + xlogReaderList = (XLogReaderState **)palloc0(DMS_MAX_INSTANCE * sizeof(XLogReaderState *)); +#endif + (void)RegisterRedoInterruptCallBack(HandlePageRedoInterrupts); g_recordbuffer = &g_dispatcher->rtoXlogBufState; GetRecoveryLatch(); /* init readstate */ +#ifdef ENABLE_SS_MULTIMASTER + InitXLogMulRecordReadBuffer(xlogReaderList, &instanceNum); +#else InitXLogRecordReadBuffer(&xlogreader); +#endif pg_atomic_write_u32(&(g_recordbuffer->readPageWorkerState), WORKER_STATE_RUN); if (IsRecoveryDone()) { @@ -2126,7 +2172,17 @@ void XLogReadPageWorkerMain() pg_atomic_write_u32(&(g_recordbuffer->readSource), XLOG_FROM_STREAM); } +#ifdef ENABLE_SS_MULTIMASTER + int minIndex = 0; + XLogRecord *record; + minIndex = get_min_xlogreader(xlogReaderList, instanceNum); + if (minIndex != -1) { + xlogreader = xlogReaderList[minIndex]; + record = (XLogRecord *)xlogreader->readRecordBuf; + } +#else XLogRecord *record = XLogParallelReadNextRecord(xlogreader); +#endif while (record != NULL) { if (ReadPageWorkerStop()) { break; @@ -2149,6 +2205,17 @@ void XLogReadPageWorkerMain() record = XLogParallelReadNextRecord(xlogreader); CountAndGetRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_1], g_redoWorker->timeCostList[TIME_COST_STEP_2]); CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_2]); +#ifdef ENABLE_SS_MULTIMASTER + if (record == NULL) { + xlogreader->isEnd = true; + } + xlogReaderList[minIndex] = xlogreader; + minIndex = get_min_xlogreader(xlogReaderList, instanceNum); + if (minIndex != -1) { + xlogreader = xlogReaderList[minIndex]; + record = (XLogRecord *)xlogreader->readRecordBuf; + } +#endif RedoInterruptCallBack(); ADD_ABNORMAL_POSITION(8); } @@ -3189,4 +3256,4 @@ bool exceed_send_lsn_forworder_interval() return true; } -} // namespace extreme_rto \ No newline at end of file +} // namespace extreme_rto diff --git a/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp b/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp index b9e093c06fcefcc926764964a8c3e35888219bf9..b0db5d75b2b6da00b3dd932b8882582c0230b693 100644 --- a/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp +++ b/src/gausskernel/storage/access/transam/extreme_rto/xlog_read.cpp @@ -920,6 +920,9 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char * securec_check_c(errorno, "\0", "\0"); record = (XLogRecord *)state->readRecordBuf; } +#ifdef ENABLE_SS_MULTIMASTER + state->logicLSN = record->logic_lsn; +#endif /* * Special processing if it's an XLOG SWITCH record diff --git a/src/gausskernel/storage/access/transam/extreme_rto_redo_api.cpp b/src/gausskernel/storage/access/transam/extreme_rto_redo_api.cpp index 057873cd6f9f4908ac171b123b73f9389154ae04..ecb6a9672ee7e9ae134c90f89d5f784c5a248112 100644 --- a/src/gausskernel/storage/access/transam/extreme_rto_redo_api.cpp +++ b/src/gausskernel/storage/access/transam/extreme_rto_redo_api.cpp @@ -60,7 +60,12 @@ typedef struct f_extreme_rto_redo { bool segment_shrink); void (*batch_clear_recovery_thread_hash_tbl)(Oid spcNode, Oid dbNode); bool (*redo_worker_is_undo_space_worker)(void); + +#ifndef ENABLE_SS_MULTIMASTER void (*start_recovery_workers)(XLogReaderState *xlogreader, uint32 privateLen); +#else + void (*start_recovery_workers)(XLogReaderState *xlogreader, XLogReaderState **xlogReaderList, int instanceNum, uint32 privateLen); +#endif void (*dispatch_redo_record_to_file)(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime); void (*get_thread_name_if_page_redo_worker)(int argc, char *argv[], char **threadNamePtr); PGPROC *(*startup_pid_get_proc)(ThreadId pid); @@ -204,10 +209,18 @@ bool ExtremeRedoWorkerIsUndoSpaceWorker() return (*(extreme_rto_redosw[g_extreme_rto_type].redo_worker_is_undo_space_worker))(); } +#ifndef ENABLE_SS_MULTIMASTER void ExtremeStartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen) { (*(extreme_rto_redosw[g_extreme_rto_type].start_recovery_workers))(xlogreader, privateLen); } +#else +void ExtremeStartRecoveryWorkers(XLogReaderState *xlogreader, XLogReaderState **xlogReaderList, int instanceNum, uint32 privateLen) +{ + (*(extreme_rto_redosw[g_extreme_rto_type].start_recovery_workers))(xlogreader, xlogReaderList, instanceNum, privateLen); +} + +#endif void ExtremeDispatchRedoRecordToFile(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime) { diff --git a/src/gausskernel/storage/access/transam/multi_redo_api.cpp b/src/gausskernel/storage/access/transam/multi_redo_api.cpp index 5697de461e20a40290e5afe7a0ada1bc8309af9a..c451266fedb885b6cea0c3c485e5fcdb0f220b75 100644 --- a/src/gausskernel/storage/access/transam/multi_redo_api.cpp +++ b/src/gausskernel/storage/access/transam/multi_redo_api.cpp @@ -34,10 +34,12 @@ #include "access/parallel_recovery/dispatcher.h" #include "access/parallel_recovery/page_redo.h" #include "access/xlog_internal.h" +#include "access/ondemand_extreme_rto/dispatcher.h" bool g_supportHotStandby = true; /* don't support consistency view */ uint32 g_startupTriggerState = TRIGGER_NORMAL; +#ifndef ENABLE_SS_MULTIMASTER void StartUpMultiRedo(XLogReaderState *xlogreader, uint32 privateLen) { if (IsExtremeRedo()) { @@ -46,6 +48,16 @@ void StartUpMultiRedo(XLogReaderState *xlogreader, uint32 privateLen) parallel_recovery::StartRecoveryWorkers(xlogreader->ReadRecPtr); } } +#else +void StartUpMultiRedo(XLogReaderState *xlogreader, XLogReaderState **xlogreaderList, int instanceNum, uint32 privateLen) +{ + if (IsExtremeRedo()) { + ExtremeStartRecoveryWorkers(xlogreader, xlogreaderList, instanceNum, privateLen); + } else if (IsParallelRedo()) { + parallel_recovery::StartRecoveryWorkers(xlogreader->ReadRecPtr); + } +} +#endif bool IsMultiThreadRedoRunning() { @@ -72,7 +84,11 @@ void DispatchRedoRecord(XLogReaderState *record, List *expectedTLIs, TimestampTz CountXLogNumbers(record); if (XLogRecGetRmid(record) == RM_XACT_ID) SetLatestXTime(recordXTime); +#ifdef ENABLE_SS_MULTIMASTER + SetXLogReplayRecPtr(record->ReadRecPtr, record->EndRecPtr, record->logicLSN); +#else SetXLogReplayRecPtr(record->ReadRecPtr, record->EndRecPtr); +#endif CheckRecoveryConsistency(); } } @@ -395,6 +411,14 @@ void DiagLogRedoRecord(XLogReaderState *record, const char *funcName) void ApplyRedoRecord(XLogReaderState *record) { +#ifdef ENABLE_SS_MULTIMASTER + XLogReaderArgs args; + if (record->args) { + memset_s(&args, sizeof(args), 0, sizeof(args)); + CurrentArgsCopy(&args); + NodeArgsSwitch(record->args); + } +#endif ErrorContextCallback errContext; errContext.callback = rm_redo_error_callback; errContext.arg = (void *)record; @@ -404,7 +428,11 @@ void ApplyRedoRecord(XLogReaderState *record) DiagLogRedoRecord(record, "ApplyRedoRecord"); } RmgrTable[XLogRecGetRmid(record)].rm_redo(record); - +#ifdef ENABLE_SS_MULTIMASTER + if (record->args) { + NodeArgsSwitch(&args); + } +#endif t_thrd.log_cxt.error_context_stack = errContext.previous; } diff --git a/src/gausskernel/storage/access/transam/multixact.cpp b/src/gausskernel/storage/access/transam/multixact.cpp index ad9d16d519a948f5dd77206014d6ce97a0e09826..fa2fb0c59ba41793f4cce75b0bfae18190d19fcc 100644 --- a/src/gausskernel/storage/access/transam/multixact.cpp +++ b/src/gausskernel/storage/access/transam/multixact.cpp @@ -2225,3 +2225,10 @@ void SSMultiXactShmemClear(void) GetBuiltInTrancheName(LWTRANCHE_MULTIXACTMEMBER_CTL), LWTRANCHE_MULTIXACTMEMBER_CTL, DSS_MAX_MXACTMEMBER, 0, MultiXactMemberControlLock, path); } + +#ifdef ENABLE_SS_MULTIMASTER +Size SSMultiXactSize(void) +{ + return sizeof(MultiXactStateData); +} +#endif diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp index 858f2833579db50a5a8d017b111d5dd812f618b4..60870725608c00377af26f247aa5bb19079e9642 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/dispatcher.cpp @@ -355,6 +355,7 @@ void AllocRecordReadBuffer(XLogReaderState *xlogreader, uint32 privateLen) #endif } +#ifndef ENABLE_SS_MULTIMASTER void SSAllocRecordReadBuffer(XLogReaderState *xlogreader, uint32 privateLen) { XLogReaderState *initreader; @@ -393,6 +394,60 @@ void SSAllocRecordReadBuffer(XLogReaderState *xlogreader, uint32 privateLen) InitLsnCheckCtl(xlogreader->ReadRecPtr); #endif } +#else +void SSAllocRecordReadBuffer(XLogReaderState **xlogReaderList, int instanceNum, uint32 privateLen) +{ + XLogReaderState *initreader; + XLogReaderState *xlogreader; + errno_t errorno = EOK; + g_dispatcher->rtoXlogBufState.mulReadBuf = (char **)palloc0(instanceNum * sizeof(char *)); + g_dispatcher->rtoXlogBufState.mulErrorMsgBuf = (char **)palloc0(instanceNum * sizeof(char *)); + g_dispatcher->rtoXlogBufState.mulReadPrivate = (void **)palloc0(instanceNum * sizeof(void *)); + g_dispatcher->rtoXlogBufState.mulInitReaders = (XLogReaderState **)palloc0(instanceNum * sizeof(XLogReaderState *)); + g_dispatcher->rtoXlogBufState.instanceNum = instanceNum; + + for (int i = 0; i < instanceNum; i++) { + g_dispatcher->rtoXlogBufState.mulReadBuf[i] = (char *)palloc0(XLOG_BLCKSZ); + g_dispatcher->rtoXlogBufState.mulErrorMsgBuf[i] = (char *)palloc0(MAX_ERRORMSG_LEN + 1); + g_dispatcher->rtoXlogBufState.mulReadPrivate[i] = (void *)palloc0(MAXALIGN(privateLen)); + } + + for (int i = 0; i < instanceNum; i++) { + xlogreader = xlogReaderList[i]; + initreader = GetXlogReader(xlogreader); + initreader->isPRProcess = true; + initreader->readBuf = g_dispatcher->rtoXlogBufState.mulReadBuf[i]; + errorno = memcpy_s(initreader->readBuf, XLOG_BLCKSZ, xlogreader->readBuf, xlogreader->readLen); + securec_check(errorno, "", ""); + initreader->errormsg_buf = g_dispatcher->rtoXlogBufState.mulErrorMsgBuf[i]; + initreader->private_data = g_dispatcher->rtoXlogBufState.mulReadPrivate[i]; + CopyDataFromOldReader(initreader, xlogreader); + g_dispatcher->rtoXlogBufState.mulInitReaders[i] = initreader; +#ifdef USE_ASSERT_CHECKING + InitLsnCheckCtl(xlogreader->ReadRecPtr); +#endif + } + g_dispatcher->rtoXlogBufState.readWorkerState = WORKER_STATE_STOP; + g_dispatcher->rtoXlogBufState.readPageWorkerState = WORKER_STATE_STOP; + g_dispatcher->rtoXlogBufState.readSource = 0; + g_dispatcher->rtoXlogBufState.failSource = 0; + g_dispatcher->rtoXlogBufState.xlogReadManagerState = READ_MANAGER_RUN; + g_dispatcher->rtoXlogBufState.targetRecPtr = InvalidXLogRecPtr; + g_dispatcher->rtoXlogBufState.expectLsn = InvalidXLogRecPtr; + g_dispatcher->rtoXlogBufState.waitRedoDone = 0; + g_dispatcher->rtoXlogBufState.readBuf = (char *)palloc0(XLOG_BLCKSZ); + g_dispatcher->rtoXlogBufState.readprivate = (void *)palloc0(MAXALIGN(privateLen)); + errorno = memset_s(g_dispatcher->rtoXlogBufState.readprivate, MAXALIGN(privateLen), 0, MAXALIGN(privateLen)); + securec_check(errorno, "", ""); + + g_dispatcher->rtoXlogBufState.errormsg_buf = (char *)palloc0(MAX_ERRORMSG_LEN + 1); + g_dispatcher->rtoXlogBufState.errormsg_buf[0] = '\0'; + + g_recordbuffer = &g_dispatcher->rtoXlogBufState; + g_startupTriggerState = TRIGGER_NORMAL; + g_readManagerTriggerFlag = TRIGGER_NORMAL; +} +#endif void HandleStartupInterruptsForExtremeRto() { @@ -447,6 +502,7 @@ static void SetOndemandXLogParseFlagValue(uint32 maxParseBufNum) g_ondemandRealtimeBuildQueueFullValue = REALTIME_BUILD_RECORD_QUEUE_SIZE * ONDEMAND_FORCE_PRUNE_RATIO; } +#ifndef ENABLE_SS_MULTIMASTER /* Run from the dispatcher thread. */ void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen) { @@ -496,6 +552,57 @@ void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen) } } +#else +void StartRecoveryWorkers(XLogReaderState *xlogreader, XLogReaderState **xlogReaderList, int instanceNum, uint32 privateLen) +{ + if (get_real_recovery_parallelism() > 1) { + if (t_thrd.xlog_cxt.StandbyModeRequested) { + ReLeaseRecoveryLatch(); + } + + CheckAlivePageWorkers(); + g_dispatcher = CreateDispatcher(); + g_dispatcher->oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.predo_cxt.parallelRedoCtx); + g_instance.comm_cxt.redoItemCtx = AllocSetContextCreate((MemoryContext)g_instance.instance_context, + "redoItemSharedMemory", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE, + SHARED_CONTEXT); + g_instance.comm_cxt.predo_cxt.redoItemHashCtrl = PRInitRedoItemHashForAllPipeline(g_instance.comm_cxt.redoItemCtx); + if (ENABLE_ONDEMAND_REALTIME_BUILD && SS_ONDEMAND_REALTIME_BUILD_NORMAL) { + errno_t rc = EOK; + g_dispatcher->restoreControlFile = (ControlFileData *)palloc(sizeof(ControlFileData)); + rc = memcpy_s(g_dispatcher->restoreControlFile, (size_t)sizeof(ControlFileData), &restoreControlFile, (size_t)sizeof(ControlFileData)); + securec_check(rc, "", ""); + } + g_dispatcher->maxItemNum = (get_batch_redo_num() + 4) * PAGE_WORK_QUEUE_SIZE * + ITEM_QUQUE_SIZE_RATIO; // 4: a startup, readmanager, txnmanager, txnworker + uint32 maxParseBufNum = (uint32)((uint64)g_instance.attr.attr_storage.dms_attr.ondemand_recovery_mem_size * + 1024 / (sizeof(XLogRecParseState) + sizeof(ParseBufferDesc) + sizeof(RedoMemSlot))); + XLogParseBufferInitFunc(&(g_dispatcher->parseManager), maxParseBufNum, &recordRefOperate, RedoInterruptCallBack); + SetOndemandXLogParseFlagValue(maxParseBufNum); + /* alloc for record readbuf */ + SSAllocRecordReadBuffer(xlogReaderList, instanceNum, privateLen); + StartPageRedoWorkers(get_real_recovery_parallelism(), SS_ONDEMAND_REALTIME_BUILD_NORMAL); + + ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), + errmsg("[PR]: max=%d, thrd=%d", g_instance.attr.attr_storage.max_recovery_parallelism, + get_real_recovery_parallelism()))); + WaitWorkerReady(); + SpinLockAcquire(&(g_instance.comm_cxt.predo_cxt.rwlock)); + g_instance.comm_cxt.predo_cxt.state = REDO_IN_PROGRESS; + SpinLockRelease(&(g_instance.comm_cxt.predo_cxt.rwlock)); + on_shmem_exit(StopRecoveryWorkers, 0); + + g_dispatcher->oldStartupIntrruptFunc = RegisterRedoInterruptCallBack(HandleStartupInterruptsForExtremeRto); + + close_readFile_if_open(); + } +} +#endif + + void DumpDispatcher() { knl_parallel_redo_state state; @@ -1014,7 +1121,11 @@ static bool DispatchXLogRecord(XLogReaderState *record, List *expectedTLIs, Time if (IsCheckPoint(record)) { RedoItem *item = GetRedoItemPtr(record); +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr ckptRecordRedoPtr = record->logicLSN; +#else XLogRecPtr ckptRecordRedoPtr = GetRedoLocInCheckpointRecord(record); +#endif FreeRedoItem(item); UpdateCheckpointRedoPtrForPrune(ckptRecordRedoPtr); AddTxnRedoItem(g_dispatcher->trxnLine.managerThd, &g_hashmapPruneMark); @@ -1748,6 +1859,13 @@ void InitReaderStateByOld(XLogReaderState *newState, XLogReaderState *oldState, newState->ReadRecPtr = oldState->ReadRecPtr; newState->EndRecPtr = oldState->EndRecPtr; +#ifdef ENABLE_SS_MULTIMASTER + newState->xlogPath = oldState->xlogPath; + newState->isEnd = oldState->isEnd; + newState->instId = oldState->instId; + newState->logicLSN = oldState->logicLSN; + newState->args = oldState->args; +#endif newState->readSegNo = oldState->readSegNo; newState->readOff = oldState->readOff; newState->readPageTLI = oldState->readPageTLI; diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp index dd3e5318c1cc8442dd48737499651b19d741e11f..8f99ecab8389cbf16d539f811aec16d2f0efdf9e 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/page_redo.cpp @@ -448,7 +448,11 @@ void RecordBlockCheck(void *rec, XLogRecPtr curPageLsn, uint32 blockId, bool rep record->decoded_record->xl_rmid, record->decoded_record->xl_info))); } else if (!(rmid == RM_HEAP2_ID && info == XLOG_HEAP2_VISIBLE) && !(rmid == RM_HEAP_ID && info == XLOG_HEAP_NEWPAGE)) { +#ifdef ENABLE_SS_MULTIMASTER + Assert(XLByteLE(record->logicLSN, curPageLsn)); +#else Assert(XLByteLE(record->EndRecPtr, curPageLsn)); +#endif } } @@ -1368,7 +1372,12 @@ static bool WaitPrimaryDoCheckpointAndAllPRTrackEmpty(XLogRecParseState *preStat bool notifyDone = false; bool waitDone = false; +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr ddlSyncPtr = preState->blockparse.blockhead.logic_lsn; +#else XLogRecPtr ddlSyncPtr = preState->blockparse.blockhead.end_ptr; +#endif + // notify dispatcher thread and wait for primary checkpoint XLogRecPtr syncRecordPtr; @@ -1435,7 +1444,11 @@ static void OndemandSwitchHTABIfBlockNumUpperLimit() void PageManagerRedoParseState(XLogRecParseState *preState) { PageManagerPruneIfRealtimeBuildFailover(); +#ifdef ENABLE_SS_MULTIMASTER + if (XLByteLT(preState->blockparse.blockhead.logic_lsn, g_redoWorker->nextPrunePtr)) { +#else if (XLByteLT(preState->blockparse.blockhead.end_ptr, g_redoWorker->nextPrunePtr)) { +#endif ReleaseBlockParseStateIfNotReplay(preState); return; } @@ -1458,7 +1471,11 @@ void PageManagerRedoParseState(XLogRecParseState *preState) SetCompletedReadEndPtr(g_redoWorker, preState->blockparse.blockhead.start_ptr, preState->blockparse.blockhead.end_ptr); SSReleaseRefRecordWithoutReplay(preState); +#ifdef ENABLE_SS_MULTIMASTER + g_redoWorker->redoItemHashCtrl->maxRedoItemPtr = preState->blockparse.blockhead.logic_lsn; +#else g_redoWorker->redoItemHashCtrl->maxRedoItemPtr = preState->blockparse.blockhead.end_ptr; +#endif CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_3]); break; case BLOCK_DATA_DDL_TYPE: @@ -1542,7 +1559,11 @@ static void OndemandCheckHashMapDistributeDone() XLogRecParseState *nextState = (XLogRecParseState *)procState->nextrecord; Assert(procState->distributeStatus != XLOG_NO_DISTRIBUTE); if (nextState != NULL) { +#ifdef ENABLE_SS_MULTIMASTER + Assert(XLByteLE(procState->blockparse.blockhead.logic_lsn, nextState->blockparse.blockhead.logic_lsn)); +#else Assert(XLByteLE(procState->blockparse.blockhead.end_ptr, nextState->blockparse.blockhead.end_ptr)); +#endif } procState = nextState; } @@ -1666,7 +1687,11 @@ static void TrxnManagerProcHashMapPrune() if (XLByteLT(g_redoWorker->nextPrunePtr, prunePtr)) { while (!SPSCBlockingQueueIsEmpty(g_dispatcher->trxnQueue)) { RedoItem *item = (RedoItem *)SPSCBlockingQueueTop(g_dispatcher->trxnQueue); +#ifdef ENABLE_SS_MULTIMASTER + if (XLByteLT(prunePtr, item->record.logicLSN)) { +#else if (XLByteLT(prunePtr, item->record.EndRecPtr)) { +#endif break; } DereferenceRedoItem(item); @@ -1732,7 +1757,11 @@ bool TrxnManagerDistributeItemsBeforeEnd(RedoItem *item) } else if (item == (void *)&g_hashmapPruneMark) { TrxnManagerProcHashMapPrune(); } else { +#ifdef ENABLE_SS_MULTIMASTER + if (XLByteLT(item->record.logicLSN, g_redoWorker->nextPrunePtr)) { +#else if (XLByteLT(item->record.EndRecPtr, g_redoWorker->nextPrunePtr)) { +#endif DereferenceRedoItem(item); return exitFlag; } @@ -2343,6 +2372,36 @@ void PutRecordToReadQueue(XLogReaderState *recordreader) SPSCBlockingQueuePut(g_dispatcher->readLine.readPageThd->queue, recordreader); } +#ifdef ENABLE_SS_MULTIMASTER +static int get_min_xlogreader(XLogReaderState **xlogreaderList, int instanceNum) +{ + int minIndex = -1; + XLogRecord *record = NULL; + XLogRecPtr minLogicRecPtr = MAX_XLOG_REC_PTR; + for (int i = 0; i < instanceNum; i++) { + if (xlogreaderList[i]->isEnd) { + continue; + } + record = (XLogRecord *)xlogreaderList[i]->readRecordBuf; + if (record != NULL && record->logic_lsn < minLogicRecPtr) { + minIndex = i; + minLogicRecPtr = record->logic_lsn; + } + } + return minIndex; +} + +inline void InitXLogMulRecordReadBuffer(XLogReaderState **initReaderList, int *instanceNum) +{ + int minIndex = 0; + *instanceNum = g_dispatcher->rtoXlogBufState.instanceNum; + for (int i = 0; i < *instanceNum; i++) { + initReaderList[i] = g_dispatcher->rtoXlogBufState.mulInitReaders[i]; + g_dispatcher->rtoXlogBufState.mulInitReaders[i] = NULL; + } +} +#endif + inline void InitXLogRecordReadBuffer(XLogReaderState **initreader) { XLogReaderState *newxlogreader; @@ -2742,7 +2801,11 @@ static void CheckAndDoForceFinish(XLogReaderState *xlogreader) void XLogReadPageWorkerMain() { XLogReaderState *xlogreader = NULL; - +#ifdef ENABLE_SS_MULTIMASTER + int instanceNum = 0; + XLogReaderState **xlogReaderList = NULL; + xlogReaderList = (XLogReaderState **)palloc0(DMS_MAX_INSTANCE * sizeof(XLogReaderState *)); +#endif (void)RegisterRedoInterruptCallBack(HandlePageRedoInterrupts); g_recordbuffer = &g_dispatcher->rtoXlogBufState; @@ -2751,11 +2814,27 @@ void XLogReadPageWorkerMain() on_shmem_exit(RealtimeBuildReleaseRecoveryLatch, 0); } /* init readstate */ +#ifdef ENABLE_SS_MULTIMASTER + InitXLogMulRecordReadBuffer(xlogReaderList, &instanceNum); +#else InitXLogRecordReadBuffer(&xlogreader); +#endif + pg_atomic_write_u32(&(g_recordbuffer->readPageWorkerState), WORKER_STATE_RUN); +#ifdef ENABLE_SS_MULTIMASTER + int minIndex = 0; + XLogRecord *record; + minIndex = get_min_xlogreader(xlogReaderList, instanceNum); + if (minIndex != -1) { + xlogreader = xlogReaderList[minIndex]; + record = (XLogRecord *)xlogreader->readRecordBuf; + } +#else XLogRecord *record = XLogParallelReadNextRecord(xlogreader); +#endif + while (record != NULL) { if (ReadPageWorkerStop()) { break; @@ -2777,6 +2856,17 @@ void XLogReadPageWorkerMain() record = XLogParallelReadNextRecord(xlogreader); CountAndGetRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_1], g_redoWorker->timeCostList[TIME_COST_STEP_2]); CountRedoTime(g_redoWorker->timeCostList[TIME_COST_STEP_2]); +#ifdef ENABLE_SS_MULTIMASTER + if (record == NULL) { + xlogreader->isEnd = true; + } + xlogReaderList[minIndex] = xlogreader; + minIndex = get_min_xlogreader(xlogReaderList, instanceNum); + if (minIndex != -1) { + xlogreader = xlogReaderList[minIndex]; + record = (XLogRecord *)xlogreader->readRecordBuf; + } +#endif RedoInterruptCallBack(); ADD_ABNORMAL_POSITION(8); } @@ -3085,7 +3175,11 @@ void SegWorkerMain() Assert(GetCurrentXLogRecParseType(redoblockstateHead) == PARSE_TYPE_SEG); GetRedoStartTime(g_redoWorker->timeCostList[TIME_COST_STEP_3]); +#ifdef ENABLE_SS_MULTIMASTER + if (XLByteLT(g_redoWorker->nextPrunePtr, redoblockstateHead->blockparse.blockhead.logic_lsn)) { +#else if (XLByteLT(g_redoWorker->nextPrunePtr, redoblockstateHead->blockparse.blockhead.end_ptr)) { +#endif OnDemandSegWorkerRedoSegParseState(redoblockstateHead); } else { ReleaseBlockParseStateIfNotReplay(redoblockstateHead); @@ -3158,7 +3252,11 @@ void HashMapManagerMain() GetRedoStartTime(g_redoWorker->timeCostList[TIME_COST_STEP_1]); while ((g_redoWorker->slotId == SEG_PROC_PIPELINE_SLOT) && !SPSCBlockingQueueIsEmpty(g_dispatcher->segQueue)) { XLogRecParseState *segRecord = (XLogRecParseState *)SPSCBlockingQueueTop(g_dispatcher->segQueue); +#ifdef ENABLE_SS_MULTIMASTER + if (XLByteLT(ckptRedoPtr, segRecord->blockparse.blockhead.logic_lsn)) { +#else if (XLByteLT(ckptRedoPtr, segRecord->blockparse.blockhead.end_ptr)) { +#endif break; } #ifdef USE_ASSERT_CHECKING @@ -3860,7 +3958,11 @@ void OndemandRequestPrimaryDoCkptIfNeed() bool SSXLogParseRecordNeedReplayInOndemandRealtimeBuild(XLogRecParseState *redoblockstate) { XLogRecPtr ckptRedoPtr = g_redoWorker->nextPrunePtr; +#ifdef ENABLE_SS_MULTIMASTER + if (XLByteLT(redoblockstate->blockparse.blockhead.logic_lsn, ckptRedoPtr) || SS_ONDEMAND_REALTIME_BUILD_SHUTDOWN) { +#else if (XLByteLT(redoblockstate->blockparse.blockhead.end_ptr, ckptRedoPtr) || SS_ONDEMAND_REALTIME_BUILD_SHUTDOWN) { +#endif return false; } return true; @@ -3908,4 +4010,4 @@ void RealtimeBuildReleaseRecoveryLatch(int code, Datum arg) { } } -} // namespace ondemand_extreme_rto \ No newline at end of file +} // namespace ondemand_extreme_rto diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp index bf5dc39fec2f5c45380e4013af4366ce7ffdc59a..7a0460d2a076348739baf1822cf34bfab2b99930 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/redo_utils.cpp @@ -458,8 +458,15 @@ XLogRecParseState *OndemandRedoReloadXLogRecord(XLogRecParseState *redoblockstat XLogReaderState *xlogreader = XLogReaderAllocate(&SimpleXLogPageReadInFdCache, &readPrivate); // do not use pre-read // step1: read record +#ifdef ENABLE_SS_MULTIMASTER + xlogreader->xlogPath = redoblockstate->blockparse.blockhead.xlog_path; + xlogreader->instId = redoblockstate->blockparse.blockhead.instId; + XLogRecord *record = XLogReadRecord(xlogreader, redoblockstate->blockparse.blockhead.start_ptr, &errormsg, + true, xlogreader->xlogPath); +#else XLogRecord *record = XLogReadRecord(xlogreader, redoblockstate->blockparse.blockhead.start_ptr, &errormsg, true, g_instance.dms_cxt.SSRecoveryInfo.recovery_xlog_dir); +#endif if (record == NULL) { ereport(PANIC, (errmodule(MOD_REDO), errcode(ERRCODE_LOG), errmsg("[On-demand] reload xlog record failed at %X/%X, spc/db/rel/bucket " @@ -548,7 +555,11 @@ void OnDemandWaitRealtimeBuildShutDown() void OnDemandUpdateRealtimeBuildPrunePtr() { +#ifdef ENABLE_SS_MULTIMASTER + ondemand_extreme_rto::UpdateCheckpointRedoPtrForPrune(t_thrd.shemem_ptr_cxt.XLogCtl->RedoStartLogicLSN); +#else ondemand_extreme_rto::UpdateCheckpointRedoPtrForPrune(t_thrd.shemem_ptr_cxt.ControlFile->checkPointCopy.redo); +#endif } void OnDemandBackupControlFile(ControlFileData* controlFile) { diff --git a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/xlog_read.cpp b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/xlog_read.cpp index 55f32634621f81282a10a7be985f4d2f182f9126..525e47c7d17dc51c1a348c626c39b11e16ae595f 100644 --- a/src/gausskernel/storage/access/transam/ondemand_extreme_rto/xlog_read.cpp +++ b/src/gausskernel/storage/access/transam/ondemand_extreme_rto/xlog_read.cpp @@ -572,6 +572,9 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char * securec_check_c(errorno, "\0", "\0"); record = (XLogRecord *)state->readRecordBuf; } +#ifdef ENABLE_SS_MULTIMASTER + state->logicLSN = record->logic_lsn; +#endif /* * Special processing if it's an XLOG SWITCH record @@ -691,11 +694,20 @@ typedef struct XLogPageReadPrivate { TimeLineID tli; } XLogPageReadPrivate; +#ifdef ENABLE_SS_MULTIMASTER +void InitXLogFileId(XLogRecPtr targetPagePtr, TimeLineID timeLine, XLogFileId* id, int instId) +{ + XLByteToSeg(targetPagePtr, id->segno); + id->tli = timeLine; + id->instId = instId; +} +#else void InitXLogFileId(XLogRecPtr targetPagePtr, TimeLineID timeLine, XLogFileId* id) { XLByteToSeg(targetPagePtr, id->segno); id->tli = timeLine; } +#endif /* XLogreader callback function, to read a WAL page */ int SimpleXLogPageReadInFdCache(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, @@ -710,7 +722,11 @@ int SimpleXLogPageReadInFdCache(XLogReaderState *xlogreader, XLogRecPtr targetPa int xlogreadfd = -1; bool found = false; +#ifdef ENABLE_SS_MULTIMASTER + InitXLogFileId(targetPagePtr, readprivate->tli, &xlogfileid, xlogreader->instId); +#else InitXLogFileId(targetPagePtr, readprivate->tli, &xlogfileid); +#endif (void)LWLockAcquire(OndemandXLogFileHandleLock, LW_SHARED); entry = (XLogFileIdCacheEntry *)hash_search(t_thrd.storage_cxt.ondemandXLogFileIdCache, diff --git a/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp b/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp index a34c36918745225feba8e52d6b5a5e30f22345d6..532f8236e7c128347de22cfdd938b01f95e667b9 100755 --- a/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp +++ b/src/gausskernel/storage/access/transam/parallel_recovery/dispatcher.cpp @@ -1627,6 +1627,13 @@ void InitReaderStateByOld(XLogReaderState *newState, XLogReaderState *oldState, { newState->ReadRecPtr = oldState->ReadRecPtr; newState->EndRecPtr = oldState->EndRecPtr; +#ifdef ENABLE_SS_MULTIMASTER + newState->xlogPath = oldState->xlogPath; + newState->isEnd = oldState->isEnd; + newState->instId = oldState->instId; + newState->logicLSN = oldState->logicLSN; + newState->args = oldState->args; +#endif newState->readSegNo = oldState->readSegNo; newState->readOff = oldState->readOff; newState->readPageTLI = oldState->readPageTLI; diff --git a/src/gausskernel/storage/access/transam/slru.cpp b/src/gausskernel/storage/access/transam/slru.cpp index 63e5a94b98b9198092004f432e3243138fd63f54..59af61733155822c079bb46096a94ded4cec13c9 100644 --- a/src/gausskernel/storage/access/transam/slru.cpp +++ b/src/gausskernel/storage/access/transam/slru.cpp @@ -226,6 +226,75 @@ void SimpleLruInit(SlruCtl ctl, const char *name, int trancheId, int nslots, int securec_check(rc, "\0", "\0"); } +#ifdef ENABLE_SS_MULTIMASTER +void ExtendSimpleLruInit( + SlruCtl ctl, int trancheId, int nslots, int nlsns, LWLock* ctllock, const char* subdir) +{ + SlruShared shared; + bool found = false; + errno_t rc = EOK; + + shared = (SlruShared)palloc0(SimpleLruShmemSize(nslots, nlsns)); + + /* Initialize locks and shared memory area */ + char *ptr = NULL; + Size offset; + int slotno; + + Assert(!found); + + rc = memset_s(shared, sizeof(SlruSharedData), 0, sizeof(SlruSharedData)); + securec_check(rc, "\0", "\0"); + + shared->control_lock = ctllock; + shared->num_slots = nslots; + shared->lsn_groups_per_page = nlsns; + shared->cur_lru_count = 0; + shared->force_check_first_xid = false; + + /* shared->latest_page_number will be set later */ + ptr = (char *)shared; + offset = MAXALIGN(sizeof(SlruSharedData)); + shared->page_buffer = (char **)(ptr + offset); + offset += MAXALIGN(nslots * sizeof(char *)); + shared->page_status = (SlruPageStatus *)(ptr + offset); + offset += MAXALIGN(nslots * sizeof(SlruPageStatus)); + shared->page_dirty = (bool *)(ptr + offset); + offset += MAXALIGN(nslots * sizeof(bool)); + shared->page_number = (int64 *)(ptr + offset); + offset += MAXALIGN(nslots * sizeof(int64)); + shared->page_lru_count = (int *)(ptr + offset); + offset += MAXALIGN(nslots * sizeof(int)); + shared->buffer_locks = (LWLock **)(ptr + offset); + offset += MAXALIGN(nslots * sizeof(LWLock *)); + + if (nlsns > 0) { + shared->group_lsn = (XLogRecPtr *)(ptr + offset); + offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); + } + + ptr += BUFFERALIGN(offset); + + for (slotno = 0; slotno < nslots; slotno++) { + shared->page_buffer[slotno] = ptr; + shared->page_status[slotno] = SLRU_PAGE_EMPTY; + shared->page_dirty[slotno] = false; + shared->page_lru_count[slotno] = 0; + shared->buffer_locks[slotno] = LWLockAssign(trancheId); + ptr += BLCKSZ; + } + + /* + * Initialize the unshared control struct, including directory path. We + * assume caller set PagePrecedes. + */ + ctl->shared = shared; + ctl->do_fsync = true; /* default behavior */ + rc = strncpy_s(ctl->dir, sizeof(ctl->dir), subdir, strlen(subdir)); + securec_check(rc, "\0", "\0"); +} +#endif + /* * Initialize (or reinitialize) a page to zeroes. * diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index 758f2c659e465ca1c14aa29e9b0c5ae7836a534d..f1cba2d9d4270d91d16b1caa865faf158f0d795f 100755 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -7709,7 +7709,11 @@ static void xact_redo_prepare(TransactionId xid) void xact_redo(XLogReaderState *record) { +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr lsn = record->logicLSN; +#else XLogRecPtr lsn = record->EndRecPtr; +#endif uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; bool compress = (bool)(XLogRecGetInfo(record) & XLR_REL_COMPRESS); bool abortXlogNewVersion = (info == XLOG_XACT_ABORT_WITH_XID); diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index 4e9c0e5849722071459de1cba86a284bacdfd396..fd288e4df05d4aa471455d3172ce41063cf6e2fa 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -190,7 +190,10 @@ g_instance.attr.attr_storage.dms_attr.instance_id != \ g_instance.dms_cxt.SSReformerControl.primaryInstId) #else -#define SS_STANDBY_INST_SKIP_SHARED_FILE false +#define SS_STANDBY_INST_SKIP_SHARED_FILE false +#define SS_REDO_ID 0 +#define SS_REDO_MODE (ENABLE_DMS && (SS_MY_INST_ID == SS_REDO_ID)) +#define SS_IS_FORK_RECORD(state) (state->args != NULL && state->instId != SS_MY_INST_ID) #endif #define RecoveryFromDummyStandby() (t_thrd.postmaster_cxt.ReplConnArray[2] != NULL && IS_DN_DUMMY_STANDYS_MODE()) @@ -6038,6 +6041,16 @@ static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, in /* This is the first try to read this page. */ t_thrd.xlog_cxt.failedSources = 0; +#ifdef ENABLE_SS_MULTIMASTER + int tmpReadFile; + XLogSegNo tmpReadSegNo; + if (SS_IS_FORK_RECORD(xlogreader)) { + tmpReadFile = t_thrd.xlog_cxt.readFile; + tmpReadSegNo = t_thrd.xlog_cxt.readSegNo; + t_thrd.xlog_cxt.readFile = xlogreader->args->readFile; + t_thrd.xlog_cxt.readSegNo = xlogreader->args->readSegNo; + } +#endif for (;;) { char *errormsg = NULL; @@ -6047,9 +6060,17 @@ static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, in record = XLogReadRecord(xlogreader, RecPtr, &errormsg); } +#ifdef ENABLE_SS_MULTIMASTER + if (!SS_IS_FORK_RECORD(xlogreader)) { + t_thrd.xlog_cxt.ReadRecPtr = xlogreader->ReadRecPtr; + t_thrd.xlog_cxt.EndRecPtr = xlogreader->EndRecPtr; + g_instance.comm_cxt.predo_cxt.redoPf.read_ptr = t_thrd.xlog_cxt.ReadRecPtr; + } +#else t_thrd.xlog_cxt.ReadRecPtr = xlogreader->ReadRecPtr; t_thrd.xlog_cxt.EndRecPtr = xlogreader->EndRecPtr; g_instance.comm_cxt.predo_cxt.redoPf.read_ptr = t_thrd.xlog_cxt.ReadRecPtr; +#endif if (record == NULL) { if (t_thrd.xlog_cxt.readFile >= 0) { @@ -6094,9 +6115,23 @@ static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, in if (record != NULL) { /* Set up lastest valid record */ +#ifdef ENABLE_SS_MULTIMASTER + if (!SS_IS_FORK_RECORD(xlogreader)) { + latestValidRecord = t_thrd.xlog_cxt.ReadRecPtr; + latestRecordCrc = record->xl_crc; + latestRecordLen = record->xl_tot_len; + } else { + xlogreader->args->readFile = t_thrd.xlog_cxt.readFile; + xlogreader->args->readSegNo = t_thrd.xlog_cxt.readSegNo; + t_thrd.xlog_cxt.readFile = tmpReadFile; + t_thrd.xlog_cxt.readSegNo = tmpReadSegNo; + } + xlogreader->logicLSN = record->logic_lsn; +#else latestValidRecord = t_thrd.xlog_cxt.ReadRecPtr; latestRecordCrc = record->xl_crc; latestRecordLen = record->xl_tot_len; +#endif if (SS_DORADO_CLUSTER) { t_thrd.xlog_cxt.ssXlogReadFailedTimes = 0; } @@ -6203,6 +6238,14 @@ static XLogRecord *ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, in dummyStandbyMode ? "true" : "false", t_thrd.xlog_cxt.recoveryTriggered ? "true" : "false"))); } +#ifdef ENABLE_SS_MULTIMASTER + if (SS_IS_FORK_RECORD(xlogreader)) { + xlogreader->args->readFile = t_thrd.xlog_cxt.readFile; + xlogreader->args->readSegNo = t_thrd.xlog_cxt.readSegNo; + t_thrd.xlog_cxt.readFile = tmpReadFile; + t_thrd.xlog_cxt.readSegNo = tmpReadSegNo; + } +#endif return NULL; } } @@ -9663,6 +9706,68 @@ static inline void set_hot_standby_recycle_xid() closedir(dir); } +#ifdef ENABLE_SS_MULTIMASTER +void NodeArgsSwitch(XLogReaderArgs *args) +{ + t_thrd.shemem_ptr_cxt.ControlFile = args->ControlFile; + t_thrd.xact_cxt.ShmemVariableCache = args->ShmemVariableCache; + t_thrd.shemem_ptr_cxt.MultiXactState = args->MultiXactState; + t_thrd.shemem_ptr_cxt.ClogCtl = args->ExtendClogCtl; + t_thrd.shemem_ptr_cxt.CsnlogCtlPtr = args->ExtendCsnlogCtlPtr; +} + +void CurrentArgsCopy(XLogReaderArgs *args) +{ + args->ControlFile = t_thrd.shemem_ptr_cxt.ControlFile; + args->ShmemVariableCache = t_thrd.xact_cxt.ShmemVariableCache; + args->MultiXactState = t_thrd.shemem_ptr_cxt.MultiXactState; + args->ExtendClogCtl = t_thrd.shemem_ptr_cxt.ClogCtl; + args->ExtendCsnlogCtlPtr = t_thrd.shemem_ptr_cxt.CsnlogCtlPtr; +} + +static int get_min_xlogreader(XLogReaderState **xlogreaderList, int instanceNum) +{ + int minIndex = -1; + XLogRecord *record = NULL; + XLogRecPtr minLogicRecPtr = MAX_XLOG_REC_PTR; + for (int i = 0; i < instanceNum; i++) { + if (xlogreaderList[i]->isEnd) { + continue; + } + if (xlogreaderList[i]->logicLSN < minLogicRecPtr) { + minIndex = i; + minLogicRecPtr = xlogreaderList[i]->logicLSN; + } + } + return minIndex; +} + +void SlruInit(XLogReaderArgs *args, int id) +{ + int i; + int rc = 0; + char dir[MAXPGPATH]; + + args->ExtendClogCtl = (SlruCtlData*)palloc0(NUM_CLOG_PARTITIONS * sizeof(SlruCtlData)); + args->ExtendCsnlogCtlPtr = (SlruCtlData*)palloc0(NUM_CSNLOG_PARTITIONS * sizeof(SlruCtlData)); + + rc = snprintf_s(dir, MAXPGPATH, MAXPGPATH - 1, + "%s/%s%d", g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name, "pg_clog", id); + securec_check_ss(rc, "", ""); + for (i = 0; i < NUM_CLOG_PARTITIONS; i++) { + ExtendSimpleLruInit(&args->ExtendClogCtl[i], (int)LWTRANCHE_CLOG_CTL, (int)CLOGShmemBuffers(), CLOG_XACTS_PER_PAGE / 32, + CBufMappingPartitionLockByIndex(i), dir); + } + + rc = snprintf_s(dir, MAXPGPATH, MAXPGPATH - 1, + "%s/%s%d", g_instance.attr.attr_storage.dss_attr.ss_dss_vg_name, "pg_csnlog", id); + securec_check_ss(rc, "\0", "\0"); + for (i = 0; i < NUM_CSNLOG_PARTITIONS; i++) { + ExtendSimpleLruInit(&args->ExtendCsnlogCtlPtr[i], LWTRANCHE_CSNLOG_CTL, CSNLOGShmemBuffers(), 0, + CSNBufMappingPartitionLockByIndex(i), dir); + } +} +#endif /* * This must be called ONCE during postmaster or standalone-backend startup */ @@ -9688,6 +9793,11 @@ void StartupXLOG(void) bool backupFromRoach = false; DBState dbstate_at_startup; XLogReaderState *xlogreader = NULL; +#ifdef ENABLE_SS_MULTIMASTER + int instanceNum = 0; + XLogReaderState **xlogreaderList = NULL; + XLogReaderArgs **xlogReaderArgsList = NULL; +#endif XLogPageReadPrivate readprivate; bool RecoveryByPending = false; /* recovery caused by pending mode */ bool ArchiveRecoveryByPending = false; /* archive recovery caused by pending mode */ @@ -9752,9 +9862,11 @@ void StartupXLOG(void) g_instance.wal_cxt.walInsertStatusTable[i].LRC = i; } - g_instance.wal_cxt.lastWalStatusEntryFlushed = -1; - g_instance.wal_cxt.lastLRCScanned = WAL_SCANNED_LRC_INIT; - g_instance.wal_cxt.lastLRCFlushed = WAL_SCANNED_LRC_INIT; + if (!SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD) { + g_instance.wal_cxt.lastWalStatusEntryFlushed = -1; + g_instance.wal_cxt.lastLRCScanned = WAL_SCANNED_LRC_INIT; + g_instance.wal_cxt.lastLRCFlushed = WAL_SCANNED_LRC_INIT; + } /* * Read control file and check XLOG status looks valid. @@ -9763,6 +9875,60 @@ void StartupXLOG(void) * not do ReadControlFile() here, but might as well do it to be sure. */ if (ENABLE_DMS && ENABLE_DSS) { +#ifdef ENABLE_SS_MULTIMASTER + int src_id = INVALID_INSTANCEID; + SSReadControlFile(REFORM_CTRL_PAGE); + if (SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD) { + src_id = SS_ONDEMAND_REALTIME_RECOVERY_ID; + OnDemandBackupControlFile(t_thrd.shemem_ptr_cxt.ControlFile); + SSReadControlFile(src_id); + } else if (SS_REDO_MODE) { + src_id = SS_MY_INST_ID; + SSDisasterGetXlogPathList(); + xlogReaderArgsList = (XLogReaderArgs **)palloc(DMS_MAX_INSTANCE * sizeof(XLogReaderArgs *)); + for (int i = 0; i < DMS_MAX_INSTANCE; i++) { + if (g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i][0] == '\0') { + break; + } + xlogReaderArgsList[i] = (XLogReaderArgs *)palloc(sizeof(XLogReaderArgs)); + instanceNum++; + if (i == SS_MY_INST_ID) { + SSReadControlFile(i); + xlogReaderArgsList[i]->ControlFile = t_thrd.shemem_ptr_cxt.ControlFile; + xlogReaderArgsList[i]->checkPointUndo = &checkPointUndo; + xlogReaderArgsList[i]->ShmemVariableCache = t_thrd.xact_cxt.ShmemVariableCache; + xlogReaderArgsList[i]->MultiXactState = t_thrd.shemem_ptr_cxt.MultiXactState; + xlogReaderArgsList[i]->ExtendClogCtl = t_thrd.shemem_ptr_cxt.ClogCtl; + xlogReaderArgsList[i]->ExtendCsnlogCtlPtr = t_thrd.shemem_ptr_cxt.CsnlogCtlPtr; + } else { + xlogReaderArgsList[i]->ControlFile = (ControlFileData *)palloc(sizeof(ControlFileData)); + xlogReaderArgsList[i]->checkPointUndo = (CheckPointUndo *)palloc(sizeof(CheckPointUndo)); + xlogReaderArgsList[i]->ShmemVariableCache = (VariableCacheData *)palloc(sizeof(VariableCacheData)); + xlogReaderArgsList[i]->MultiXactState = (MultiXactStateData *)palloc(SSMultiXactSize()); + + SSReadControlFile(i, false, xlogReaderArgsList[i]->ControlFile); + SlruInit(xlogReaderArgsList[i], i); + xlogReaderArgsList[i]->checkPointUndo->next_csn = COMMITSEQNO_FIRST_NORMAL + 1; + xlogReaderArgsList[i]->checkPointUndo->length = 0; + xlogReaderArgsList[i]->checkPointUndo->recent_global_xmin = InvalidTransactionId; + xlogReaderArgsList[i]->checkPointUndo->globalRecycleXid = InvalidTransactionId; + errno_t rc = memset_s(xlogReaderArgsList[i]->ShmemVariableCache,sizeof(VariableCacheData),0,sizeof(VariableCacheData)); + securec_check(rc, "\0", "\0"); + rc = memset_s(xlogReaderArgsList[i]->MultiXactState, sizeof(SSMultiXactSize()), 0, sizeof(SSMultiXactSize())); + securec_check(rc, "", ""); + } + xlogReaderArgsList[i]->readFile = -1; + xlogReaderArgsList[i]->readSegNo = 0; + } + } else { + SSCSNLOGShmemClear(); + SSCLOGShmemClear(); + SSMultiXactShmemClear(); + src_id = SS_MY_INST_ID; + SSReadControlFile(SS_MY_INST_ID); + } + g_instance.dms_cxt.SSRecoveryInfo.recovery_inst_id = src_id; +#else int src_id = INVALID_INSTANCEID; SSReadControlFile(REFORM_CTRL_PAGE); if (SS_CLUSTER_ONDEMAND_NOT_NORAML && SS_PRIMARY_MODE) { @@ -9790,6 +9956,7 @@ void StartupXLOG(void) } g_instance.dms_cxt.SSRecoveryInfo.recovery_inst_id = src_id; SSReadControlFile(src_id); +#endif } else { ReadControlFile(); } @@ -9990,11 +10157,35 @@ void StartupXLOG(void) if (ENABLE_DMS && ENABLE_DSS) { SSGetRecoveryXlogPath(); +#ifdef ENABLE_SS_MULTIMASTER + if (SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD || !SS_REDO_MODE) { + xlogreader = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER); + close_readFile_if_open(); + } else { + xlogreaderList = (XLogReaderState **)palloc(instanceNum * sizeof(XLogReaderState *)); + for (int i = 0; i < instanceNum; i++) { + xlogreaderList[i] = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER); + if (xlogreaderList[i] == NULL) { + ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"), + errdetail("Failed while allocating an XLog reading processor"))); + } + + close_readFile_if_open(); + xlogreaderList[i]->xlogPath = g_instance.dms_cxt.SSRecoveryInfo.xlog_list[i]; + xlogreaderList[i]->instId = i; + xlogreaderList[i]->isEnd = false; + xlogreaderList[i]->args = xlogReaderArgsList[i]; + xlogreaderList[i]->system_identifier = xlogReaderArgsList[i]->ControlFile->system_identifier; + } + xlogreader = xlogreaderList[SS_MY_INST_ID]; + } +#else if (SS_DISASTER_CLUSTER) { SSDisasterGetXlogPathList(); } xlogreader = SSXLogReaderAllocate(&SSXLogPageRead, &readprivate, ALIGNOF_BUFFER); close_readFile_if_open(); +#endif if (SS_STANDBY_FAILOVER || SS_STANDBY_PROMOTING || SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD) { // init shared memory set page empty SSCSNLOGShmemClear(); @@ -10167,12 +10358,51 @@ void StartupXLOG(void) } } +#ifdef ENABLE_SS_MULTIMASTER + if (SS_REDO_MODE) { + for (int i = 0; i < instanceNum; i++) { + if (xlogreaderList[i]->instId == SS_MY_INST_ID) { + continue; + } + ControlFileData *tempControlFile = xlogreaderList[i]->args->ControlFile; + checkPointLoc = tempControlFile->checkPoint; + + record = ReadCheckpointRecord(xlogreaderList[i], checkPointLoc, 1); + if (record != NULL) { + ereport(DEBUG1, (errmsg("MultiRedo inst %d checkpoint record is at %X/%X", i, (uint32)(checkPointLoc >> 32), + (uint32)checkPointLoc))); + } else { + checkPointLoc = tempControlFile->prevCheckPoint; + record = ReadCheckpointRecord(xlogreaderList[i], checkPointLoc, 2); + if (record != NULL) { + ereport(LOG, (errmsg("MultiRedo inst %d using previous checkpoint record at %X/%X", i, (uint32)(checkPointLoc >> 32), + (uint32)checkPointLoc))); + t_thrd.xlog_cxt.InRecovery = true; /* force recovery even if SHUTDOWNED */ + } else { + ereport(PANIC, (errmsg("MultiRedo inst %d could not locate a valid checkpoint record", i))); + } + } + rcm = memcpy_s(xlogReaderArgsList[i]->checkPointUndo, sizeof(checkPointUndo), XLogRecGetData(xlogreaderList[i]), sizeof(checkPointUndo)); + securec_check(rcm, "", ""); + } + } +#endif /* * Get the last valid checkpoint record. If the latest one according * to pg_control is broken, try the next-to-last one. */ checkPointLoc = t_thrd.shemem_ptr_cxt.ControlFile->checkPoint; t_thrd.xlog_cxt.RedoStartLSN = t_thrd.shemem_ptr_cxt.ControlFile->checkPointCopy.redo; +#ifdef ENABLE_SS_MULTIMASTER + if (SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD) { + record = ReadRecord(xlogreaderList[SS_ONDEMAND_REALTIME_RECOVERY_ID], xlogReaderArgsList[SS_ONDEMAND_REALTIME_RECOVERY_ID]->ControlFile->checkPointCopy.redo, LOG, false); + if (record != NULL) { + t_thrd.shemem_ptr_cxt.XLogCtl->RedoStartLogicLSN = record->logic_lsn; + } else { + t_thrd.shemem_ptr_cxt.XLogCtl->RedoStartLogicLSN = InvalidXLogRecPtr; + } + } +#endif g_instance.comm_cxt.predo_cxt.redoPf.redo_start_ptr = t_thrd.xlog_cxt.RedoStartLSN; record = ReadCheckpointRecord(xlogreader, checkPointLoc, 1); @@ -10269,6 +10499,48 @@ void StartupXLOG(void) } #endif +#ifdef ENABLE_SS_MULTIMASTER + if (SS_REDO_MODE) { + for (int i = 0; i < instanceNum; i++) { + if (xlogreaderList[i]->instId == SS_MY_INST_ID) { + continue; + } + CheckPointUndo *tmpCheckPoint = xlogreaderList[i]->args->checkPointUndo; + /* we are the only one to change these args during startup */ + NodeArgsSwitch(xlogreaderList[i]->args); + + /* initialize shared memory variables from the checkpoint record */ + t_thrd.xact_cxt.ShmemVariableCache->nextXid = tmpCheckPoint->ori_checkpoint.nextXid; + t_thrd.xact_cxt.ShmemVariableCache->nextOid = tmpCheckPoint->ori_checkpoint.nextOid; + t_thrd.xact_cxt.ShmemVariableCache->oidCount = 0; + MultiXactSetNextMXact(tmpCheckPoint->ori_checkpoint.nextMulti, tmpCheckPoint->ori_checkpoint.nextMultiOffset); + SetTransactionIdLimit(tmpCheckPoint->ori_checkpoint.oldestXid, tmpCheckPoint->ori_checkpoint.oldestXidDB); + SetMultiXactIdLimit(FirstMultiXactId, TemplateDbOid); + xlogreaderList[i]->args->ckptXid = tmpCheckPoint->ori_checkpoint.oldestXid; + + latestCompletedXid = tmpCheckPoint->ori_checkpoint.nextXid; + TransactionIdRetreat(latestCompletedXid); + t_thrd.xact_cxt.ShmemVariableCache->latestCompletedXid = latestCompletedXid; + + t_thrd.xact_cxt.ShmemVariableCache->xmin = t_thrd.xact_cxt.ShmemVariableCache->nextXid; + t_thrd.xact_cxt.ShmemVariableCache->recentLocalXmin = latestCompletedXid + 1; + t_thrd.xact_cxt.ShmemVariableCache->startupMaxXid = t_thrd.xact_cxt.ShmemVariableCache->nextXid; + + t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo = tmpCheckPoint->next_csn; + /* initialize shared memory variable standbyXmin from checkpoint record */ + if (!TransactionIdIsValid(t_thrd.xact_cxt.ShmemVariableCache->standbyXmin)) { + if (recordLen == CHECKPOINTUNDO_LEN) { + t_thrd.xact_cxt.ShmemVariableCache->standbyXmin = tmpCheckPoint->recent_global_xmin; + } else if (wasShutdown) { + t_thrd.xact_cxt.ShmemVariableCache->standbyXmin = tmpCheckPoint->ori_checkpoint.nextXid - 1; + } + } + t_thrd.xact_cxt.ShmemVariableCache->recentGlobalXmin = InvalidTransactionId; + t_thrd.xact_cxt.ShmemVariableCache->xlogMaxCSN = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo - 1; + } + NodeArgsSwitch(xlogreaderList[SS_MY_INST_ID]->args); + } +#endif /* initialize shared memory variables from the checkpoint record */ t_thrd.xact_cxt.ShmemVariableCache->nextXid = checkPoint.nextXid; t_thrd.xact_cxt.ShmemVariableCache->nextOid = checkPoint.nextOid; @@ -10412,9 +10684,11 @@ void StartupXLOG(void) StartupCSNLOG(); - t_thrd.xlog_cxt.lastFullPageWrites = checkPoint.fullPageWrites; - t_thrd.xlog_cxt.RedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr = - t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoRecPtr = checkPoint.redo; + if (!SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD) { + t_thrd.xlog_cxt.lastFullPageWrites = checkPoint.fullPageWrites; + t_thrd.xlog_cxt.RedoRecPtr = t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr = + t_thrd.shemem_ptr_cxt.XLogCtl->Insert.RedoRecPtr = checkPoint.redo; + } if (ENABLE_INCRE_CKPT) { t_thrd.xlog_cxt.doPageWrites = false; @@ -10454,7 +10728,11 @@ void StartupXLOG(void) } } +#ifdef ENABLE_SS_MULTIMASTER + if (!SS_REDO_MODE && t_thrd.xlog_cxt.InRecovery == true && SS_ONDEMAND_REALTIME_BUILD_DISABLED) { +#else if (SS_STANDBY_MODE && t_thrd.xlog_cxt.InRecovery == true && SS_ONDEMAND_REALTIME_BUILD_DISABLED) { +#endif /* do not need replay anything in SS standby mode */ ereport(LOG, (errmsg("[SS] Skip redo replay in standby mode"))); t_thrd.xlog_cxt.InRecovery = false; @@ -10780,7 +11058,41 @@ void StartupXLOG(void) record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); } +#ifdef ENABLE_SS_MULTIMASTER + if (SS_REDO_MODE) { + if (record == NULL) { + xlogreader->isEnd = true; + } + for (int i = 0; i < instanceNum; i++) { + if (xlogreaderList[i]->instId == SS_MY_INST_ID) { + continue; + } + XLogRecPtr tmpCheckPointLoc = xlogreaderList[i]->args->ControlFile->checkPoint; + XLogRecPtr tmpRedoPointLoc = xlogreaderList[i]->args->checkPointUndo->ori_checkpoint.redo; + if (XLByteLT(tmpCheckPointLoc, tmpRedoPointLoc)) { + ereport(PANIC, (errmsg("MultiRedo inst %d invalid redo in checkpoint record", i))); + } + if (XLByteLT(tmpRedoPointLoc, tmpCheckPointLoc)) { + t_thrd.xlog_cxt.InRecovery = true; + record = ReadRecord(xlogreaderList[i], checkPoint.redo, PANIC, false); + } else { + record = ReadRecord(xlogreaderList[i], InvalidXLogRecPtr, LOG, false); + if (record == NULL) { + xlogreaderList[i]->isEnd = true; + } + } + } + int minIndex = get_min_xlogreader(xlogreaderList, instanceNum); + if (minIndex != -1) { + xlogreader = xlogreaderList[minIndex]; + record = (XLogRecord *)xlogreader->readRecordBuf; + xlogctl->lastReplayedLogicLSN = record->logic_lsn; + } + } + XLogReaderState *oldXlogReader = xlogreaderList[SS_MY_INST_ID]; +#else XLogReaderState *oldXlogReader = xlogreader; +#endif if (record != NULL) { bool recoveryContinue = true; @@ -10795,7 +11107,11 @@ void StartupXLOG(void) t_thrd.xlog_cxt.InRedo = true; g_instance.comm_cxt.predo_cxt.redoPf.preEndPtr = 0; ResourceManagerStartup(); +#ifndef ENABLE_SS_MULTIMASTER StartUpMultiRedo(xlogreader, sizeof(readprivate)); +#else + StartUpMultiRedo(xlogreader, xlogreaderList, instanceNum, sizeof(readprivate)); +#endif if (IsExtremeRedo()) { xlogreader->isPRProcess = true; @@ -10987,6 +11303,18 @@ void StartupXLOG(void) } else { xlogreader = newXlogReader; record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); +#ifdef ENABLE_SS_MULTIMASTER + if (SS_REDO_MODE) { + if (record == NULL) { + xlogreader->isEnd = true; + } + int minIndex = get_min_xlogreader(xlogreaderList, instanceNum); + if (minIndex != -1) { + xlogreader = xlogreaderList[minIndex]; + record = (XLogRecord *)xlogreader->readRecordBuf; + } + } +#endif } CountRedoTime(t_thrd.xlog_cxt.timeCost[TIME_COST_STEP_1]); } while (record != NULL); // end of main redo apply loop @@ -10996,6 +11324,20 @@ void StartupXLOG(void) } else { SendRecoveryEndMarkToWorkersAndWaitForFinish(0); } +#ifdef ENABLE_SS_MULTIMASTER + if (SS_REDO_MODE) { + xlogreader = xlogreaderList[SS_MY_INST_ID]; + for (int i = 0; i < instanceNum; i++) { + if (xlogreaderList[i]->instId == SS_MY_INST_ID) { + continue; + } + for (int j = 0; j < NUM_CLOG_PARTITIONS; j++) + (void)SimpleLruFlush(&xlogreaderList[i]->args->ExtendClogCtl[j], false); + for (int j = 0; j < NUM_CSNLOG_PARTITIONS; j++) + (void)SimpleLruFlush(&xlogreaderList[i]->args->ExtendCsnlogCtlPtr[j], false); + } + } +#endif RecoveryXlogReader(oldXlogReader, xlogreader); if (!(IS_OBS_DISASTER_RECOVER_MODE || IS_MULTI_DISASTER_RECOVER_MODE)) { @@ -11139,6 +11481,14 @@ void StartupXLOG(void) * Re-fetch the last valid or last applied record, so we can identify the * exact endpoint of what we consider the valid portion of WAL. */ +#ifdef ENABLE_SS_MULTIMASTER + do { + record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false); + t_thrd.xlog_cxt.LastRec = t_thrd.xlog_cxt.ReadRecPtr; + } while (record != NULL); + record = ReadRecord(xlogreader, t_thrd.xlog_cxt.ReadRecPtr, LOG, false); + EndOfLog = t_thrd.xlog_cxt.EndRecPtr; +#else xlogreader->readSegNo = 0; xlogreader->readOff = 0; xlogreader->readLen = 0; @@ -11146,6 +11496,7 @@ void StartupXLOG(void) UpdateTermFromXLog(record->xl_term); EndOfLog = t_thrd.xlog_cxt.EndRecPtr; +#endif XLByteToPrevSeg(EndOfLog, endLogSegNo); if (!SS_DISASTER_STANDBY_CLUSTER && ((ENABLE_DMS && SS_STANDBY_FAILOVER) || SS_STANDBY_PROMOTING)) { @@ -11402,7 +11753,12 @@ void StartupXLOG(void) #ifndef ENABLE_MULTIPLE_NODES StartupReplicationOrigin(); #endif +#ifdef ENABLE_SS_MULTIMASTER + if (SS_REDO_MODE) + TrimCLOG(); +#else TrimCLOG(); +#endif /* Reload shared-memory state for prepared transactions */ RecoverPreparedTransactions(); @@ -11425,8 +11781,15 @@ void StartupXLOG(void) ShutdownReadFileFacility(); /* Shut down the xlog reader facility. */ +#ifdef ENABLE_SS_MULTIMASTER + for (int i = 0; i < instanceNum; i++) { + XLogReaderFree(xlogreaderList[i]); + xlogreaderList[i] = NULL; + } +#else XLogReaderFree(xlogreader); xlogreader = NULL; +#endif XLogReportParameters(); @@ -17052,13 +17415,18 @@ XLogRecPtr GetXLogReplayRecPtr(TimeLineID *targetTLI, XLogRecPtr *ReplayReadPtr) return recptr; } -void SetXLogReplayRecPtr(XLogRecPtr readRecPtr, XLogRecPtr endRecPtr) +void SetXLogReplayRecPtr(XLogRecPtr readRecPtr, XLogRecPtr endRecPtr, uint32 logicLSN) { bool isUpdated = false; XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl; SpinLockAcquire(&xlogctl->info_lck); +#ifdef ENABLE_SS_MULTIMASTER + if (XLByteLT(xlogctl->lastReplayedLogicLSN, logicLSN)) { + xlogctl->lastReplayedLogicLSN = logicLSN; +#else if (XLByteLT(xlogctl->lastReplayedReadRecPtr, endRecPtr)) { +#endif xlogctl->lastReplayedReadRecPtr = readRecPtr; xlogctl->lastReplayedEndRecPtr = endRecPtr; g_instance.comm_cxt.predo_cxt.redoPf.last_replayed_end_ptr = endRecPtr; @@ -21204,8 +21572,18 @@ int SSXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int re read_len = SSStreamReadXLog(xlogreader, targetPagePtr, reqLen, targetRecPtr, readBuf, readTLI, g_instance.dms_cxt.SSRecoveryInfo.recovery_xlog_dir); } else { +#ifdef ENABLE_SS_MULTIMASTER + if (xlogreader->xlogPath) { + read_len = SSReadXLog(xlogreader, targetPagePtr, reqLen, targetRecPtr, + readBuf, readTLI, xlogreader->xlogPath); + } else { + read_len = SSReadXLog(xlogreader, targetPagePtr, reqLen, targetRecPtr, + readBuf, readTLI, g_instance.dms_cxt.SSRecoveryInfo.recovery_xlog_dir); + } +#else read_len = SSReadXLog(xlogreader, targetPagePtr, reqLen, targetRecPtr, readBuf, readTLI, g_instance.dms_cxt.SSRecoveryInfo.recovery_xlog_dir); +#endif } return read_len; } diff --git a/src/gausskernel/storage/access/transam/xlogreader.cpp b/src/gausskernel/storage/access/transam/xlogreader.cpp index 8891a678b221d8933c602634198c82676f373b46..3a4deac9bc3cf6a794710a8840586344cdf4d273 100644 --- a/src/gausskernel/storage/access/transam/xlogreader.cpp +++ b/src/gausskernel/storage/access/transam/xlogreader.cpp @@ -480,6 +480,9 @@ XLogRecord *XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **err securec_check_c(errorno, "\0", "\0"); record = (XLogRecord *)state->readRecordBuf; } +#ifdef ENABLE_SS_MULTIMASTER + state->logicLSN = record->logic_lsn; +#endif /* * Special processing if it's an XLOG SWITCH record @@ -995,6 +998,9 @@ bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errorms state->decoded_record = record; state->record_origin = InvalidRepOriginId; +#ifdef ENABLE_SS_MULTIMASTER + state->logicLSN = record->logic_lsn; +#endif ptr = (char *)record; ptr += SizeOfXLogRecord; diff --git a/src/gausskernel/storage/access/transam/xlogutils.cpp b/src/gausskernel/storage/access/transam/xlogutils.cpp index de74c1dc714219859cf1751e01b2f382fab9ecce..94cf29e9aabdbe4c37bdae167a012c5e23e75d4e 100644 --- a/src/gausskernel/storage/access/transam/xlogutils.cpp +++ b/src/gausskernel/storage/access/transam/xlogutils.cpp @@ -779,8 +779,13 @@ XLogRedoAction XLogReadBufferForRedoExtended(XLogReaderState *record, uint8 bloc tde = InsertTdeInfoToCache(blockinfo.rnode, record->blocks[block_id].tdeinfo); } +#ifdef ENABLE_SS_MULTIMASTER + redoaction = XLogReadBufferForRedoBlockExtend(&blockinfo, mode, get_cleanup_lock, bufferinfo, record->logicLSN, + record->blocks[block_id].last_lsn, willinit, readmethod, tde); +#else redoaction = XLogReadBufferForRedoBlockExtend(&blockinfo, mode, get_cleanup_lock, bufferinfo, record->EndRecPtr, record->blocks[block_id].last_lsn, willinit, readmethod, tde); +#endif if (redoaction == BLK_NOTFOUND) { return BLK_NOTFOUND; } diff --git a/src/gausskernel/storage/lmgr/lwlock.cpp b/src/gausskernel/storage/lmgr/lwlock.cpp index 3f98bb4ff0cf836318809179435d714f2c7f653b..f2b5a1abb9fa6bcff4333493d240281012ce9def 100644 --- a/src/gausskernel/storage/lmgr/lwlock.cpp +++ b/src/gausskernel/storage/lmgr/lwlock.cpp @@ -387,6 +387,16 @@ int NumLWLocks(void) * backendLock is actually not allocated. */ numLocks += (2 * GLOBAL_ALL_PROCS - g_instance.attr.attr_storage.max_prepared_xacts * NUM_TWOPHASE_PARTITIONS); +#ifdef ENABLE_SS_MULTIMASTER + /* clog.c needs one per CLOG buffer */ + numLocks += DMS_MAX_INSTANCES * CLOGShmemBuffers(); + + /* csnlog.c needs one per CLOG buffer */ + numLocks += DMS_MAX_INSTANCES * NUM_CSNLOG_PARTITIONS * CSNLOGShmemBuffers(); + + /* clog.c needs one per CLOG buffer */ + numLocks += DMS_MAX_INSTANCES * NUM_CLOG_PARTITIONS * CLOGShmemBuffers(); +#else /* clog.c needs one per CLOG buffer */ numLocks += CLOGShmemBuffers(); @@ -395,6 +405,7 @@ int NumLWLocks(void) /* clog.c needs one per CLOG buffer */ numLocks += NUM_CLOG_PARTITIONS * CLOGShmemBuffers(); +#endif /* multixact.c needs two SLRU areas */ if (ENABLE_DSS) { diff --git a/src/gausskernel/storage/smgr/segment/segxlog.cpp b/src/gausskernel/storage/smgr/segment/segxlog.cpp index ca364f22defeaf5a8020a9d854b26b0fbab4154b..a24582899919736ce0c55cdd196c481c0419ff93 100644 --- a/src/gausskernel/storage/smgr/segment/segxlog.cpp +++ b/src/gausskernel/storage/smgr/segment/segxlog.cpp @@ -685,7 +685,11 @@ static void redo_create_extent_group(XLogReaderState *record) /* Create SegSpace object in memory */ SegSpace *spc = spc_init_space_node(rnode->spcNode, rnode->dbNode); +#ifdef ENABLE_SS_MULTIMASTER + eg_init_data_files(&spc->extent_group[EXTENT_TYPE_TO_GROUPID(rnode->relNode)][forknum], true, record->logicLSN); +#else eg_init_data_files(&spc->extent_group[EXTENT_TYPE_TO_GROUPID(rnode->relNode)][forknum], true, record->EndRecPtr); +#endif } static void redo_init_map_page(XLogReaderState *record) @@ -766,14 +770,22 @@ static void redo_space_drop(XLogReaderState *record) XLogDropSegmentSpace(spcNode, dbNode); } +#ifdef ENABLE_SS_MULTIMASTER +void seg_redo_new_page_copy_and_flush(BufferTag *tag, char *data, XLogRecPtr lsn, uint64 logic_lsn) +#else void seg_redo_new_page_copy_and_flush(BufferTag *tag, char *data, XLogRecPtr lsn) +#endif { char page[BLCKSZ] __attribute__((__aligned__(ALIGNOF_BUFFER))) = {0}; errno_t er = memcpy_s(page, BLCKSZ, data, BLCKSZ); securec_check(er, "\0", "\0"); +#ifdef ENABLE_SS_MULTIMASTER + PageSetLSN(page, logic_lsn); +#else PageSetLSN(page, lsn); +#endif PageSetChecksumInplace(page, tag->blockNum); if (FORCE_FINISH_ENABLED) { @@ -812,7 +824,11 @@ static void redo_new_page(XLogReaderState *record) { Assert(record != NULL); BufferTag *tag = (BufferTag *)XLogRecGetData(record); +#ifdef ENABLE_SS_MULTIMASTER + seg_redo_new_page_copy_and_flush(tag, (char *)XLogRecGetData(record) + sizeof(BufferTag), record->EndRecPtr, record->logicLSN); +#else seg_redo_new_page_copy_and_flush(tag, (char *)XLogRecGetData(record) + sizeof(BufferTag), record->EndRecPtr); +#endif } void segpage_smgr_redo(XLogReaderState *record) diff --git a/src/gausskernel/storage/smgr/segstore.cpp b/src/gausskernel/storage/smgr/segstore.cpp index 84858123cb36c30e042f9d96398120c948212ab8..1a20f8ccb28a9f8aeab7be9e068d6c66e72aed3d 100755 --- a/src/gausskernel/storage/smgr/segstore.cpp +++ b/src/gausskernel/storage/smgr/segstore.cpp @@ -811,7 +811,11 @@ static BlockNumber bucket_alloc_segment(Oid tablespace_id, Oid database_id, Bloc ((PageHeader)main_head_page)->pd_lower += sizeof(BktMainHead); BktMainHead *main_head = (BktMainHead *)PageGetContents(main_head_page); +#ifdef ENABLE_SS_MULTIMASTER + XLogRecPtr lsn = GetXLogInsertLogicLSN(); +#else XLogRecPtr lsn = GetXLogInsertRecPtr(); +#endif main_head->magic = BUCKET_SEGMENT_MAGIC; main_head->lsn = lsn; main_head->redis_info.redis_xid = InvalidTransactionId; diff --git a/src/include/access/extreme_rto/dispatcher.h b/src/include/access/extreme_rto/dispatcher.h index 60b2d65b0b9601091ce30473514ca576d4031af8..1e5055f0eb073a9413cae8c0b50fbb55c0d84354 100644 --- a/src/include/access/extreme_rto/dispatcher.h +++ b/src/include/access/extreme_rto/dispatcher.h @@ -120,6 +120,13 @@ typedef struct RecordBufferState { XLogRecPtr targetRecPtr; XLogRecPtr expectLsn; uint32 waitRedoDone; +#ifdef ENABLE_SS_MULTIMASTER + XLogReaderState **mulInitReaders; + char **mulReadBuf; + char **mulErrorMsgBuf; + void **mulReadPrivate; + uint32 instanceNum; +#endif } RecordBufferState; typedef struct { @@ -207,7 +214,12 @@ inline int get_trxn_redo_worker_num() return TRXN_REDO_WORKER_NUM; } +#ifndef ENABLE_SS_MULTIMASTER void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen); +#else +void StartRecoveryWorkers(XLogReaderState *xlogreader, XLogReaderState **xlogReaderList, int instanceNum, uint32 privateLen); +#endif + /* RedoItem lifecycle. */ void DispatchRedoRecordToFile(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime); diff --git a/src/include/access/extreme_rto_redo_api.h b/src/include/access/extreme_rto_redo_api.h index 9b35e4cf44c9e232a42a40a6d157366c3fb83ed2..910350b220694747b1a59bd091cb8b7e841c6b87 100644 --- a/src/include/access/extreme_rto_redo_api.h +++ b/src/include/access/extreme_rto_redo_api.h @@ -69,7 +69,11 @@ void ExtremeClearRecoveryThreadHashTbl(const RelFileNode &node, ForkNumber forkn bool segment_shrink); void ExtremeBatchClearRecoveryThreadHashTbl(Oid spcNode, Oid dbNode); bool ExtremeRedoWorkerIsUndoSpaceWorker(); +#ifndef ENABLE_SS_MULTIMASTER void ExtremeStartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen); +#else +void ExtremeStartRecoveryWorkers(XLogReaderState *xlogreader, XLogReaderState **xlogReaderList, int instanceNum, uint32 privateLen); +#endif void ExtremeDispatchRedoRecordToFile(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime); void ExtremeGetThreadNameIfPageRedoWorker(int argc, char *argv[], char **threadNamePtr); PGPROC *ExtremeStartupPidGetProc(ThreadId pid); diff --git a/src/include/access/multi_redo_api.h b/src/include/access/multi_redo_api.h index 8f07d35a7563ef611e394e7714b1f84706e7ecc4..3a827824b1f147ee3c30cb634b4f2f8441e1c6ab 100644 --- a/src/include/access/multi_redo_api.h +++ b/src/include/access/multi_redo_api.h @@ -122,7 +122,11 @@ PageRedoExitStatus CheckExitPageWorkers(ThreadId pid); void SetMyPageRedoWorker(knl_thread_arg* arg); uint32 GetMyPageRedoWorkerId(); void MultiRedoMain(); +#ifndef ENABLE_SS_MULTIMASTER void StartUpMultiRedo(XLogReaderState* xlogreader, uint32 privateLen); +#else +void StartUpMultiRedo(XLogReaderState *xlogreader, XLogReaderState **xlogreaderList, int instanceNum, uint32 privateLen); +#endif void ProcTxnWorkLoad(bool force); void EndDispatcherContext(); diff --git a/src/include/access/multixact.h b/src/include/access/multixact.h index 5e3d8d11d224f080dc493766de04b155313d605f..a9e0e0c898f6305c163b9cd9d084a74770e86493 100644 --- a/src/include/access/multixact.h +++ b/src/include/access/multixact.h @@ -146,4 +146,8 @@ extern void SSMultiXactShmemClear(void); extern void SSGetMultiXactIdMembers(MultiXactId multi, int *nmembers, MultiXactMember **members, int owner); +#ifdef ENABLE_SS_MULTIMASTER +extern Size SSMultiXactSize(void); +#endif + #endif /* MULTIXACT_H */ diff --git a/src/include/access/ondemand_extreme_rto/dispatcher.h b/src/include/access/ondemand_extreme_rto/dispatcher.h index dae92770fc6eb75c7d9e2365919ba53459b5e851..62451c98181658a307439facdb1afb24dd9e0925 100644 --- a/src/include/access/ondemand_extreme_rto/dispatcher.h +++ b/src/include/access/ondemand_extreme_rto/dispatcher.h @@ -126,6 +126,13 @@ typedef struct RecordBufferState { XLogRecPtr targetRecPtr; XLogRecPtr expectLsn; uint32 waitRedoDone; +#ifdef ENABLE_SS_MULTIMASTER + XLogReaderState **mulInitReaders; + char **mulReadBuf; + char **mulErrorMsgBuf; + void **mulReadPrivate; + uint32 instanceNum; +#endif } RecordBufferState; typedef struct { @@ -228,7 +235,11 @@ inline int get_trxn_redo_worker_num() return TRXN_REDO_WORKER_NUM; } +#ifndef ENABLE_SS_MULTIMASTER void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen); +#else +void StartRecoveryWorkers(XLogReaderState *xlogreader, XLogReaderState **xlogReaderList, int instanceNum, uint32 privateLen); +#endif /* RedoItem lifecycle. */ void DispatchRedoRecordToFile(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime); diff --git a/src/include/access/ondemand_extreme_rto/xlog_read.h b/src/include/access/ondemand_extreme_rto/xlog_read.h index 10cb8c2515510d5a31fa26d160cabf9333c060e5..20ca75e75ca30045acf8fa2514247a9fbbc9917d 100644 --- a/src/include/access/ondemand_extreme_rto/xlog_read.h +++ b/src/include/access/ondemand_extreme_rto/xlog_read.h @@ -38,6 +38,9 @@ XLogRecord *ParallelReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char * typedef struct XLogFileId { XLogSegNo segno; TimeLineID tli; +#ifdef ENABLE_SS_MULTIMASTER + int instId; +#endif } XLogFileId; typedef struct XLogFileIdCacheEntry { diff --git a/src/include/access/slru.h b/src/include/access/slru.h index b8b1d556ab812ee81fd28ca3f67eb13c7ce64f9b..9181f2ce32f091f973603609448ffab6eff78d8b 100644 --- a/src/include/access/slru.h +++ b/src/include/access/slru.h @@ -176,4 +176,9 @@ extern bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, const char* filename, int64 s extern void SimpleLruSetPageEmpty( SlruCtl ctl, const char* name, int trancheId, int nslots, int nlsns, LWLock* ctllock, const char* subdir, int index = 0); +#ifdef ENABLE_SS_MULTIMASTER +extern void ExtendSimpleLruInit( + SlruCtl ctl, int trancheId, int nslots, int nlsns, LWLock* ctllock, const char* subdir); +#endif + #endif /* SLRU_H */ diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index c7db8ecf252dae0d00c970c65a19d9fb79f39204..32106e545a32e721236c5f587e07432bdf007bcc 100755 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -587,6 +587,10 @@ typedef struct XLogCtlData { */ XLogRecPtr lastReplayedEndRecPtr; XLogRecPtr replayEndRecPtr; +#ifdef ENABLE_SS_MULTIMASTER + uint32 lastReplayedLogicLSN; + uint32 RedoStartLogicLSN; +#endif /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ TimestampTz recoveryLastXTime; /* current effective recovery target timeline */ @@ -734,7 +738,7 @@ extern bool XLogInsertAllowed(void); extern bool SSModifySharedLunAllowed(void); extern void GetXLogReceiptTime(TimestampTz* rtime, bool* fromStream); extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID* targetTLI, XLogRecPtr* ReplayReadPtr = NULL); -extern void SetXLogReplayRecPtr(XLogRecPtr readRecPtr, XLogRecPtr endRecPtr); +extern void SetXLogReplayRecPtr(XLogRecPtr readRecPtr, XLogRecPtr endRecPtr, uint32 logicLSN = 0); extern void DumpXlogCtl(); extern void CheckRecoveryConsistency(void); diff --git a/src/include/access/xlog_basic.h b/src/include/access/xlog_basic.h index 771a4780e3572785aec116b7603a391fd50c1721..03cdb88974ef71e76dd28cd6c3692e34d028aebf 100644 --- a/src/include/access/xlog_basic.h +++ b/src/include/access/xlog_basic.h @@ -232,6 +232,23 @@ typedef struct XLogRecord { /* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */ } XLogRecord; +#ifdef ENABLE_SS_MULTIMASTER +typedef struct XLogReaderArgs { + struct ControlFileData* ControlFile; + struct CheckPointUndo* checkPointUndo; + struct VariableCacheData* ShmemVariableCache; + struct MultiXactStateData* MultiXactState; + bool XLogDelayFlag; + TransactionId ckptXid; + int readFile; + XLogSegNo readSegNo; + struct SlruCtlData* ExtendClogCtl; + struct SlruCtlData* ExtendCsnlogCtlPtr; + struct SlruCtlData* ExtendMultiXactOffsetCtl; + struct SlruCtlData* ExtendMultiXactMemberCtl; +} XLogReaderArgs; +#endif + struct XLogReaderState { /* * @@ -355,6 +372,13 @@ struct XLogReaderState { uint32 refcount; bool isTde; uint32 readblocks; +#ifdef ENABLE_SS_MULTIMASTER + char* xlogPath; + bool isEnd; + int instId; + XLogRecPtr logicLSN; /* current logic lsn */ + XLogReaderArgs* args; +#endif }; #define SizeOfXLogRecord (offsetof(XLogRecord, xl_crc) + sizeof(pg_crc32c)) @@ -480,5 +504,10 @@ typedef struct ShareStorageOperateCtl_ { extern List *readTimeLineHistory(TimeLineID targetTLI); +#ifdef ENABLE_SS_MULTIMASTER +extern void NodeArgsSwitch(XLogReaderArgs *args); +extern void CurrentArgsCopy(XLogReaderArgs *args); +#endif + #endif /* XLOG_BASIC_H */ diff --git a/src/include/access/xlogproc.h b/src/include/access/xlogproc.h index 81343c5ef149642079329d0fdc23aa661f0a9873..2da801de0a6235c9ee0e5ae536adb796d3e1fdaf 100755 --- a/src/include/access/xlogproc.h +++ b/src/include/access/xlogproc.h @@ -49,7 +49,11 @@ typedef void (*relasexlogreadstate)(void* record); #define XLogBlockHeadGetXid(blockhead) ((blockhead)->xl_xid) #define XLogBlockHeadGetRmid(blockhead) ((blockhead)->xl_rmid) +#ifdef ENABLE_SS_MULTIMASTER +#define XLogBlockHeadGetLSN(blockhead) ((blockhead)->logic_lsn) +#else #define XLogBlockHeadGetLSN(blockhead) ((blockhead)->end_ptr) +#endif #define XLogBlockHeadGetRelNode(blockhead) ((blockhead)->relNode) #define XLogBlockHeadGetSpcNode(blockhead) ((blockhead)->spcNode) #define XLogBlockHeadGetDbNode(blockhead) ((blockhead)->dbNode) @@ -511,6 +515,11 @@ typedef struct { uint2 opt; bool is_conflict_type; /* whether wal log type is conflict with standby read if redo */ XLogPhyBlock pblk; +#ifdef ENABLE_SS_MULTIMASTER + char* xlog_path; + int instId; + uint32 logic_lsn; /* copy from XLogReaderState's LastLogicLSN */ +#endif } XLogBlockHead; #define XLogBlockHeadEncodeSize (sizeof(XLogBlockHead)) @@ -1298,7 +1307,11 @@ extern void GistRedoDataBlock(XLogBlockHead *blockhead, XLogBlockDataParse *bloc extern bool IsCheckPoint(const XLogRecParseState *parseState); bool is_backup_end(const XLogRecParseState *parse_state); void redo_atomic_xlog_dispatch(uint8 opCode, RedoBufferInfo *redo_buf, const char *data); +#ifdef ENABLE_SS_MULTIMASTER +void seg_redo_new_page_copy_and_flush(BufferTag *tag, char *data, XLogRecPtr lsn, uint64 logic_lsn); +#else void seg_redo_new_page_copy_and_flush(BufferTag *tag, char *data, XLogRecPtr lsn); +#endif void redo_target_page(const BufferTag& buf_tag, StandbyReadLsnInfoArray* lsn_info, Buffer base_page_buf); void MarkSegPageRedoChildPageDirty(RedoBufferInfo *bufferinfo); diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 1c8d47b3169698efdded1a8aaa9a97b1877e2250..3c69bf0a35cde16ae17db2ed38c9ce4893d6af10 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -147,7 +147,7 @@ typedef struct ControlFileData { */ DBState state; /* see enum above */ pg_time_t time; /* time stamp of last pg_control update */ - XLogRecPtr checkPoint; /* last check point record ptr */ + XLogRecPtr checkPoint; /* last check point record ptr */ XLogRecPtr prevCheckPoint; /* previous check point record ptr */ CheckPoint checkPointCopy; /* copy of last check point record */ diff --git a/src/include/ddes/dms/ss_init.h b/src/include/ddes/dms/ss_init.h index 25c69a9d10c92863fd3e1428d08ce5beaa64e85a..fa26f48dc25bdcc7256439613a355f0908539f45 100644 --- a/src/include/ddes/dms/ss_init.h +++ b/src/include/ddes/dms/ss_init.h @@ -45,6 +45,9 @@ #define SS_OFFICIAL_PRIMARY true #define SS_OFFICIAL_RECOVERY_NODE (SS_MY_INST_ID == SS_RECOVERY_ID) #endif +#ifdef ENABLE_SS_MULTIMASTER +#define SS_ONDEMAND_REALTIME_RECOVERY_ID SS_MY_INST_ID ? (SS_MY_INST_ID - 1) : (g_instance.attr.attr_storage.dms_attr.inst_count - 1) +#endif void DMSInit(); void DMSUninit(); diff --git a/src/include/ddes/dms/ss_reform_common.h b/src/include/ddes/dms/ss_reform_common.h index e638ea4b10c4fa5c7eb649cf654f1c566616d262..7e41fcad7102963d20d5855f684cbd47986849c4 100644 --- a/src/include/ddes/dms/ss_reform_common.h +++ b/src/include/ddes/dms/ss_reform_common.h @@ -46,7 +46,7 @@ void SSGetRecoveryXlogPath(); char* SSGetNextXLogPath(TimeLineID tli, XLogRecPtr startptr); void SSDisasterGetXlogPathList(); void SSUpdateReformerCtrl(); -void SSReadControlFile(int id, bool updateDmsCtx = false); +void SSReadControlFile(int id, bool updateDmsCtx = false, ControlFileData* temp = NULL); void SSClearSegCache(); int SSCancelTransactionOfAllStandby(SSBroadcastOp type); int SSProcessCancelTransaction(SSBroadcastOp type);