From 2b11b804274b3774331490fe73e06f7083851410 Mon Sep 17 00:00:00 2001 From: shenzhengntu <503699317@qq.com> Date: Wed, 17 May 2023 20:07:10 +0800 Subject: [PATCH] parallel push page MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code Source From: Self Code Description: 【Optional】 Jira: #【Optional】 市场项目编号(名称):【Optional】 --- src/backend/access/transam/xlog.c | 17 +++++++---- src/backend/storage/buffer/bufmgr.c | 46 ++++------------------------- src/backend/storage/smgr/smgr.c | 22 ++++++++++++++ src/include/storage/smgr.h | 1 + 4 files changed, 40 insertions(+), 46 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 97f1485..2c715b8 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8288,6 +8288,18 @@ data_buffer_for_replay(XLogReaderState *record,XLogRecPtr startLsn,XLogRecPtr en bool valid; BufferDesc *buf; /* See if the block is in the buffer pool already */ + //for pg master he3db slave or backup restore + SMgrRelation smgr = smgropen(rnode, InvalidBackendId); + smgrcreate(smgr, forknum, true); + BlockNumber blockNum = startupsmgrnblocks(smgr, forknum); + static char blkspace[BLCKSZ] = {0}; + if (blockNum != P_NEW) { + for (int i = blockNum;i<=blkno;i++) { + smgrextend(smgr,forknum,i,blkspace,false); + } + } else { + elog(PANIC,"data_buffer_for_replay blockNum is P_NEW"); + } LWLockAcquire(partition_lock, LW_SHARED); buf_id = BufTableLookup(&tag, hash); /* If page is in buffer, we can apply record, otherwise we do nothing */ @@ -8305,11 +8317,6 @@ data_buffer_for_replay(XLogReaderState *record,XLogRecPtr startLsn,XLogRecPtr en } updateLastReplayLsn(record); ReleaseBuffer(buffer); - /*if (EnableHotStandby == true && push_standby == false) { - if (StartBufferIO(buf, true)) { - pageInMemoryFlushBufferToDisk(&tag); - } - }*/ } else { updateLastReplayLsn(record); LWLockRelease(partition_lock); diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 1e3b6a4..3a7a379 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -995,7 +995,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * always have left a zero-filled buffer, complain if not PageIsNew. */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); - if (!InRecovery && !PageIsNew((Page) bufBlock)) + if (!PageIsNew((Page) bufBlock)) ereport(PANIC, (errmsg("unexpected data beyond EOF in block %u of relation %s", blockNum, relpath(smgr->smgr_rnode, forkNum)), @@ -1067,10 +1067,10 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isExtend) { /* new buffers are zero-filled */ - if (!InRecovery || !found) { - MemSet((char *) bufBlock, 0, BLCKSZ); - smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false); - } + + MemSet((char *) bufBlock, 0, BLCKSZ); + smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false); + /* don't set checksum for all-zero page */ /* for new page precache */ @@ -1087,42 +1087,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, preCacheNodei++; } } - //parallel replay PageFlushWorkerMain=>ProcFlushBufferToDisk=>XLogReadBufferExtended=>default status RM_NORMAL, - //where init page,status is RBM_ZERO_AND_LOCK will lead to page invaild,so need smgrextend page then to smgrread - //push standby can use ReadWalsByPage to replay base RBM_ZERO page,but slave must be ensure flush page min LSN point - // bigger than push standby provide Point of consistency,if not slave no to replay ,wal list maybe cut to lead to replay - // data wrong - if (!((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) && - !isLocalBuf) && IsBootstrapProcessingMode() != true && InitdbSingle != true) - { - if (EnableHotStandby == true || InRecovery) { - BufferTag pageTag; - pageTag.rnode = smgr->smgr_rnode.node; - pageTag.forkNum = forkNum; - pageTag.blockNum = blockNum; - replayLsn = GetXLogReplayRecPtr(&tli); - XLogRecPtr pageLsn = BufferGetLSN(bufHdr); - head = GetLogIndexByPage(&pageTag,pageLsn,replayLsn); - if ((EnableHotStandby == true && push_standby == false) || he3mirror) { - if (head->next!=NULL) { - tWalRecord = ReadWalsByPage(pageTag.rnode.dbNode,pageTag.rnode.relNode,forkNum,blockNum,tli,head); - } - } else { - LsnNode* next = head->next; - if (next!=NULL) { - walRecord.cap = 8192; - walRecord.buf = malloc(walRecord.cap); - } - while(next!=NULL) { - int count = walRecordQuery(&walRecord.buf,&walRecord.count,&walRecord.cap,next->lsn); - if (count == -1) { - elog(FATAL,"======walRecordQuery query wal Faild %X/%X===1===",LSN_FORMAT_ARGS(next->lsn)); - } - next = next->next; - } - } - } - } /* * NB: we're *not* doing a ScheduleBufferTagForWriteback here; diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 305ce39..e3587fc 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -642,6 +642,28 @@ smgrnblocks(SMgrRelation reln, ForkNumber forknum) return result; } +/* + * smgrnblocks() -- Calculate the number of blocks in the + * supplied relation. + */ +BlockNumber +startupsmgrnblocks(SMgrRelation reln, ForkNumber forknum) +{ + BlockNumber result; + + /* Check and return if we get the cached value for the number of blocks. */ + result = smgrnblocks_cached(reln, forknum); + if (result != InvalidBlockNumber) + return result; + + result = smgrsw[reln->smgr_which].smgr_nblocks(reln, forknum); + + reln->smgr_cached_nblocks[forknum] = result; + + return result; +} + + /* * smgrnblocks_cached() -- Get the cached number of blocks in the supplied * relation. diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index 3d25bed..c8c9f1a 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -101,6 +101,7 @@ extern void smgrwrite(SMgrRelation reln, ForkNumber forknum, extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum); +extern BlockNumber startupsmgrnblocks(SMgrRelation reln, ForkNumber forknum); extern BlockNumber smgrnblocks_cached(SMgrRelation reln, ForkNumber forknum); extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nblocks);