mirror of
https://gitee.com/he3db/he3pg.git
synced 2024-12-01 19:58:06 +08:00
pitr for walkv and for 8 parallel apply
This commit is contained in:
parent
e275d69cd5
commit
f68204187e
@ -991,6 +991,7 @@ static void readRecoverySignalFile(void);
|
||||
static void validateRecoveryParameters(void);
|
||||
static void exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog);
|
||||
static bool recoveryStopsBefore(XLogReaderState *record);
|
||||
static bool he3recoveryStopsAfter(XLogReaderState *record);
|
||||
static bool recoveryStopsAfter(XLogReaderState *record);
|
||||
static void ConfirmRecoveryPaused(void);
|
||||
static void recoveryPausesHere(bool endOfRecovery);
|
||||
@ -7382,8 +7383,8 @@ recoveryStopsBefore(XLogReaderState *record)
|
||||
* Ignore recovery target settings when not in archive recovery (meaning
|
||||
* we are in crash recovery).
|
||||
*/
|
||||
//if (!ArchiveRecoveryRequested)
|
||||
// return false;
|
||||
if (!ArchiveRecoveryRequested)
|
||||
return false;
|
||||
|
||||
/* Check if we should stop as soon as reaching consistency */
|
||||
if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE && reachedConsistency)
|
||||
@ -7511,6 +7512,89 @@ recoveryStopsBefore(XLogReaderState *record)
|
||||
return stopsHere;
|
||||
}
|
||||
|
||||
static bool
|
||||
he3recoveryStopsAfter(XLogReaderState *record)
|
||||
{
|
||||
bool stopsHere = false;
|
||||
uint8 xact_info;
|
||||
bool isCommit;
|
||||
TimestampTz recordXtime = 0;
|
||||
TransactionId recordXid;
|
||||
|
||||
/* Otherwise we only consider stopping before COMMIT or ABORT records. */
|
||||
if (XLogRecGetRmid(record) != RM_XACT_ID)
|
||||
return false;
|
||||
|
||||
xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
|
||||
|
||||
if (xact_info == XLOG_XACT_COMMIT)
|
||||
{
|
||||
isCommit = true;
|
||||
recordXid = XLogRecGetXid(record);
|
||||
}
|
||||
else if (xact_info == XLOG_XACT_COMMIT_PREPARED)
|
||||
{
|
||||
xl_xact_commit *xlrec = (xl_xact_commit *)XLogRecGetData(record);
|
||||
xl_xact_parsed_commit parsed;
|
||||
|
||||
isCommit = true;
|
||||
ParseCommitRecord(XLogRecGetInfo(record),
|
||||
xlrec,
|
||||
&parsed);
|
||||
recordXid = parsed.twophase_xid;
|
||||
}
|
||||
else if (xact_info == XLOG_XACT_ABORT)
|
||||
{
|
||||
isCommit = false;
|
||||
recordXid = XLogRecGetXid(record);
|
||||
}
|
||||
else if (xact_info == XLOG_XACT_ABORT_PREPARED)
|
||||
{
|
||||
xl_xact_abort *xlrec = (xl_xact_abort *)XLogRecGetData(record);
|
||||
xl_xact_parsed_abort parsed;
|
||||
|
||||
isCommit = false;
|
||||
ParseAbortRecord(XLogRecGetInfo(record),
|
||||
xlrec,
|
||||
&parsed);
|
||||
recordXid = parsed.twophase_xid;
|
||||
}
|
||||
else
|
||||
return false;
|
||||
|
||||
if (recoveryTarget == RECOVERY_TARGET_TIME &&
|
||||
getRecordTimestamp(record, &recordXtime))
|
||||
{
|
||||
stopsHere = (recordXtime >= recoveryTargetTime);
|
||||
}
|
||||
|
||||
if (stopsHere)
|
||||
{
|
||||
recoveryStopAfter = false;
|
||||
recoveryStopXid = recordXid;
|
||||
recoveryStopTime = recordXtime;
|
||||
recoveryStopLSN = InvalidXLogRecPtr;
|
||||
recoveryStopName[0] = '\0';
|
||||
|
||||
if (isCommit)
|
||||
{
|
||||
ereport(LOG,
|
||||
(errmsg("recovery stopping before commit of transaction %u, time %s",
|
||||
recoveryStopXid,
|
||||
timestamptz_to_str(recoveryStopTime))));
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(LOG,
|
||||
(errmsg("recovery stopping before abort of transaction %u, time %s",
|
||||
recoveryStopXid,
|
||||
timestamptz_to_str(recoveryStopTime))));
|
||||
}
|
||||
}
|
||||
|
||||
return stopsHere;
|
||||
}
|
||||
|
||||
/*
|
||||
* Same as recoveryStopsBefore, but called after applying the record.
|
||||
*
|
||||
@ -9461,6 +9545,15 @@ void StartupXLOG(void)
|
||||
}
|
||||
g_redoStartLsn[2] = record->xl_end;
|
||||
g_redoStartLsn[1] = record->xl_end - record->xl_tot_len;
|
||||
|
||||
if (he3_point_in_time_recovery && he3recoveryStopsAfter(xlogreader))
|
||||
{
|
||||
DelRangeWals(1, xlogreader->EndRecPtr, 18446744073709551615);
|
||||
pushTikv(0,hashMapSize(),true);
|
||||
while (ReadRecord(xlogreader, LOG, false) != NULL) {
|
||||
}
|
||||
break;
|
||||
}
|
||||
/* Else, try to fetch the next WAL record */
|
||||
record = ReadRecord(xlogreader, LOG, false);
|
||||
} while (record != NULL);
|
||||
@ -10087,7 +10180,8 @@ CheckRecoveryConsistency(void)
|
||||
if (XLogRecPtrIsInvalid(minRecoveryPoint) && !he3_point_in_time_recovery)
|
||||
return;
|
||||
|
||||
Assert(InArchiveRecovery);
|
||||
if (!he3_point_in_time_recovery)
|
||||
Assert(InArchiveRecovery);
|
||||
|
||||
/*
|
||||
* assume that we are called in the startup process, and hence don't need
|
||||
|
@ -995,7 +995,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
||||
* always have left a zero-filled buffer, complain if not PageIsNew.
|
||||
*/
|
||||
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
|
||||
if (!PageIsNew((Page) bufBlock))
|
||||
if (!InRecovery && !PageIsNew((Page) bufBlock))
|
||||
ereport(PANIC,
|
||||
(errmsg("unexpected data beyond EOF in block %u of relation %s",
|
||||
blockNum, relpath(smgr->smgr_rnode, forkNum)),
|
||||
@ -1067,9 +1067,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
||||
if (isExtend)
|
||||
{
|
||||
/* new buffers are zero-filled */
|
||||
MemSet((char *) bufBlock, 0, BLCKSZ);
|
||||
if (!InRecovery || !found) {
|
||||
MemSet((char *) bufBlock, 0, BLCKSZ);
|
||||
smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
|
||||
}
|
||||
/* don't set checksum for all-zero page */
|
||||
smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
|
||||
|
||||
/* for new page precache */
|
||||
if (*preCacheNodesCountPtr > 0)
|
||||
|
Loading…
Reference in New Issue
Block a user