!132 pitr for walkv and for 8 parallel apply

Merge pull request !132 from 裴庭伟/dev_performance
This commit is contained in:
shipixian 2023-05-08 01:24:21 +00:00 committed by Gitee
commit 59deab39ca
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
2 changed files with 102 additions and 6 deletions

View File

@ -991,6 +991,7 @@ static void readRecoverySignalFile(void);
static void validateRecoveryParameters(void);
static void exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog);
static bool recoveryStopsBefore(XLogReaderState *record);
static bool he3recoveryStopsAfter(XLogReaderState *record);
static bool recoveryStopsAfter(XLogReaderState *record);
static void ConfirmRecoveryPaused(void);
static void recoveryPausesHere(bool endOfRecovery);
@ -7382,8 +7383,8 @@ recoveryStopsBefore(XLogReaderState *record)
* Ignore recovery target settings when not in archive recovery (meaning
* we are in crash recovery).
*/
//if (!ArchiveRecoveryRequested)
// return false;
if (!ArchiveRecoveryRequested)
return false;
/* Check if we should stop as soon as reaching consistency */
if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE && reachedConsistency)
@ -7511,6 +7512,89 @@ recoveryStopsBefore(XLogReaderState *record)
return stopsHere;
}
static bool
he3recoveryStopsAfter(XLogReaderState *record)
{
bool stopsHere = false;
uint8 xact_info;
bool isCommit;
TimestampTz recordXtime = 0;
TransactionId recordXid;
/* Otherwise we only consider stopping before COMMIT or ABORT records. */
if (XLogRecGetRmid(record) != RM_XACT_ID)
return false;
xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
if (xact_info == XLOG_XACT_COMMIT)
{
isCommit = true;
recordXid = XLogRecGetXid(record);
}
else if (xact_info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *)XLogRecGetData(record);
xl_xact_parsed_commit parsed;
isCommit = true;
ParseCommitRecord(XLogRecGetInfo(record),
xlrec,
&parsed);
recordXid = parsed.twophase_xid;
}
else if (xact_info == XLOG_XACT_ABORT)
{
isCommit = false;
recordXid = XLogRecGetXid(record);
}
else if (xact_info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort *xlrec = (xl_xact_abort *)XLogRecGetData(record);
xl_xact_parsed_abort parsed;
isCommit = false;
ParseAbortRecord(XLogRecGetInfo(record),
xlrec,
&parsed);
recordXid = parsed.twophase_xid;
}
else
return false;
if (recoveryTarget == RECOVERY_TARGET_TIME &&
getRecordTimestamp(record, &recordXtime))
{
stopsHere = (recordXtime >= recoveryTargetTime);
}
if (stopsHere)
{
recoveryStopAfter = false;
recoveryStopXid = recordXid;
recoveryStopTime = recordXtime;
recoveryStopLSN = InvalidXLogRecPtr;
recoveryStopName[0] = '\0';
if (isCommit)
{
ereport(LOG,
(errmsg("recovery stopping before commit of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
else
{
ereport(LOG,
(errmsg("recovery stopping before abort of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
}
return stopsHere;
}
/*
* Same as recoveryStopsBefore, but called after applying the record.
*
@ -9461,6 +9545,15 @@ void StartupXLOG(void)
}
g_redoStartLsn[2] = record->xl_end;
g_redoStartLsn[1] = record->xl_end - record->xl_tot_len;
if (he3_point_in_time_recovery && he3recoveryStopsAfter(xlogreader))
{
DelRangeWals(1, xlogreader->EndRecPtr, 18446744073709551615);
pushTikv(0,hashMapSize(),true);
while (ReadRecord(xlogreader, LOG, false) != NULL) {
}
break;
}
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, LOG, false);
} while (record != NULL);
@ -10087,7 +10180,8 @@ CheckRecoveryConsistency(void)
if (XLogRecPtrIsInvalid(minRecoveryPoint) && !he3_point_in_time_recovery)
return;
Assert(InArchiveRecovery);
if (!he3_point_in_time_recovery)
Assert(InArchiveRecovery);
/*
* assume that we are called in the startup process, and hence don't need

View File

@ -995,7 +995,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* always have left a zero-filled buffer, complain if not PageIsNew.
*/
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
if (!PageIsNew((Page) bufBlock))
if (!InRecovery && !PageIsNew((Page) bufBlock))
ereport(PANIC,
(errmsg("unexpected data beyond EOF in block %u of relation %s",
blockNum, relpath(smgr->smgr_rnode, forkNum)),
@ -1067,9 +1067,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
if (isExtend)
{
/* new buffers are zero-filled */
MemSet((char *) bufBlock, 0, BLCKSZ);
if (!InRecovery || !found) {
MemSet((char *) bufBlock, 0, BLCKSZ);
smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
}
/* don't set checksum for all-zero page */
smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
/* for new page precache */
if (*preCacheNodesCountPtr > 0)