parallel push page

Code Source From: Self Code
Description:  【Optional】
Jira:  #【Optional】
市场项目编号(名称):【Optional】
This commit is contained in:
shenzhengntu 2023-05-17 20:07:10 +08:00
parent 739e6d4f63
commit 2b11b80427
4 changed files with 40 additions and 46 deletions

View File

@ -8288,6 +8288,18 @@ data_buffer_for_replay(XLogReaderState *record,XLogRecPtr startLsn,XLogRecPtr en
bool valid;
BufferDesc *buf;
/* See if the block is in the buffer pool already */
//for pg master he3db slave or backup restore
SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
smgrcreate(smgr, forknum, true);
BlockNumber blockNum = startupsmgrnblocks(smgr, forknum);
static char blkspace[BLCKSZ] = {0};
if (blockNum != P_NEW) {
for (int i = blockNum;i<=blkno;i++) {
smgrextend(smgr,forknum,i,blkspace,false);
}
} else {
elog(PANIC,"data_buffer_for_replay blockNum is P_NEW");
}
LWLockAcquire(partition_lock, LW_SHARED);
buf_id = BufTableLookup(&tag, hash);
/* If page is in buffer, we can apply record, otherwise we do nothing */
@ -8305,11 +8317,6 @@ data_buffer_for_replay(XLogReaderState *record,XLogRecPtr startLsn,XLogRecPtr en
}
updateLastReplayLsn(record);
ReleaseBuffer(buffer);
/*if (EnableHotStandby == true && push_standby == false) {
if (StartBufferIO(buf, true)) {
pageInMemoryFlushBufferToDisk(&tag);
}
}*/
} else {
updateLastReplayLsn(record);
LWLockRelease(partition_lock);

View File

@ -995,7 +995,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* always have left a zero-filled buffer, complain if not PageIsNew.
*/
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (!InRecovery && !PageIsNew((Page) bufBlock))
if (!PageIsNew((Page) bufBlock))
ereport(PANIC,
(errmsg("unexpected data beyond EOF in block %u of relation %s",
blockNum, relpath(smgr->smgr_rnode, forkNum)),
@ -1067,10 +1067,10 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
if (isExtend)
{
/* new buffers are zero-filled */
if (!InRecovery || !found) {
MemSet((char *) bufBlock, 0, BLCKSZ);
smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
}
MemSet((char *) bufBlock, 0, BLCKSZ);
smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
/* don't set checksum for all-zero page */
/* for new page precache */
@ -1087,42 +1087,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
preCacheNodei++;
}
}
//parallel replay PageFlushWorkerMain=>ProcFlushBufferToDisk=>XLogReadBufferExtended=>default status RM_NORMAL,
//where init page,status is RBM_ZERO_AND_LOCK will lead to page invaild,so need smgrextend page then to smgrread
//push standby can use ReadWalsByPage to replay base RBM_ZERO page,but slave must be ensure flush page min LSN point
// bigger than push standby provide Point of consistency,if not slave no to replay ,wal list maybe cut to lead to replay
// data wrong
if (!((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
!isLocalBuf) && IsBootstrapProcessingMode() != true && InitdbSingle != true)
{
if (EnableHotStandby == true || InRecovery) {
BufferTag pageTag;
pageTag.rnode = smgr->smgr_rnode.node;
pageTag.forkNum = forkNum;
pageTag.blockNum = blockNum;
replayLsn = GetXLogReplayRecPtr(&tli);
XLogRecPtr pageLsn = BufferGetLSN(bufHdr);
head = GetLogIndexByPage(&pageTag,pageLsn,replayLsn);
if ((EnableHotStandby == true && push_standby == false) || he3mirror) {
if (head->next!=NULL) {
tWalRecord = ReadWalsByPage(pageTag.rnode.dbNode,pageTag.rnode.relNode,forkNum,blockNum,tli,head);
}
} else {
LsnNode* next = head->next;
if (next!=NULL) {
walRecord.cap = 8192;
walRecord.buf = malloc(walRecord.cap);
}
while(next!=NULL) {
int count = walRecordQuery(&walRecord.buf,&walRecord.count,&walRecord.cap,next->lsn);
if (count == -1) {
elog(FATAL,"======walRecordQuery query wal Faild %X/%X===1===",LSN_FORMAT_ARGS(next->lsn));
}
next = next->next;
}
}
}
}
/*
* NB: we're *not* doing a ScheduleBufferTagForWriteback here;

View File

@ -642,6 +642,28 @@ smgrnblocks(SMgrRelation reln, ForkNumber forknum)
return result;
}
/*
* smgrnblocks() -- Calculate the number of blocks in the
* supplied relation.
*/
BlockNumber
startupsmgrnblocks(SMgrRelation reln, ForkNumber forknum)
{
BlockNumber result;
/* Check and return if we get the cached value for the number of blocks. */
result = smgrnblocks_cached(reln, forknum);
if (result != InvalidBlockNumber)
return result;
result = smgrsw[reln->smgr_which].smgr_nblocks(reln, forknum);
reln->smgr_cached_nblocks[forknum] = result;
return result;
}
/*
* smgrnblocks_cached() -- Get the cached number of blocks in the supplied
* relation.

View File

@ -101,6 +101,7 @@ extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,
extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber nblocks);
extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
extern BlockNumber startupsmgrnblocks(SMgrRelation reln, ForkNumber forknum);
extern BlockNumber smgrnblocks_cached(SMgrRelation reln, ForkNumber forknum);
extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum,
int nforks, BlockNumber *nblocks);