diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0fb6186..aa8c405 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -991,6 +991,7 @@ static void readRecoverySignalFile(void); static void validateRecoveryParameters(void); static void exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog); static bool recoveryStopsBefore(XLogReaderState *record); +static bool he3recoveryStopsAfter(XLogReaderState *record); static bool recoveryStopsAfter(XLogReaderState *record); static void ConfirmRecoveryPaused(void); static void recoveryPausesHere(bool endOfRecovery); @@ -7382,8 +7383,8 @@ recoveryStopsBefore(XLogReaderState *record) * Ignore recovery target settings when not in archive recovery (meaning * we are in crash recovery). */ - //if (!ArchiveRecoveryRequested) - // return false; + if (!ArchiveRecoveryRequested) + return false; /* Check if we should stop as soon as reaching consistency */ if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE && reachedConsistency) @@ -7511,6 +7512,89 @@ recoveryStopsBefore(XLogReaderState *record) return stopsHere; } +static bool +he3recoveryStopsAfter(XLogReaderState *record) +{ + bool stopsHere = false; + uint8 xact_info; + bool isCommit; + TimestampTz recordXtime = 0; + TransactionId recordXid; + + /* Otherwise we only consider stopping before COMMIT or ABORT records. */ + if (XLogRecGetRmid(record) != RM_XACT_ID) + return false; + + xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK; + + if (xact_info == XLOG_XACT_COMMIT) + { + isCommit = true; + recordXid = XLogRecGetXid(record); + } + else if (xact_info == XLOG_XACT_COMMIT_PREPARED) + { + xl_xact_commit *xlrec = (xl_xact_commit *)XLogRecGetData(record); + xl_xact_parsed_commit parsed; + + isCommit = true; + ParseCommitRecord(XLogRecGetInfo(record), + xlrec, + &parsed); + recordXid = parsed.twophase_xid; + } + else if (xact_info == XLOG_XACT_ABORT) + { + isCommit = false; + recordXid = XLogRecGetXid(record); + } + else if (xact_info == XLOG_XACT_ABORT_PREPARED) + { + xl_xact_abort *xlrec = (xl_xact_abort *)XLogRecGetData(record); + xl_xact_parsed_abort parsed; + + isCommit = false; + ParseAbortRecord(XLogRecGetInfo(record), + xlrec, + &parsed); + recordXid = parsed.twophase_xid; + } + else + return false; + + if (recoveryTarget == RECOVERY_TARGET_TIME && + getRecordTimestamp(record, &recordXtime)) + { + stopsHere = (recordXtime >= recoveryTargetTime); + } + + if (stopsHere) + { + recoveryStopAfter = false; + recoveryStopXid = recordXid; + recoveryStopTime = recordXtime; + recoveryStopLSN = InvalidXLogRecPtr; + recoveryStopName[0] = '\0'; + + if (isCommit) + { + ereport(LOG, + (errmsg("recovery stopping before commit of transaction %u, time %s", + recoveryStopXid, + timestamptz_to_str(recoveryStopTime)))); + } + else + { + ereport(LOG, + (errmsg("recovery stopping before abort of transaction %u, time %s", + recoveryStopXid, + timestamptz_to_str(recoveryStopTime)))); + } + } + + return stopsHere; +} + /* * Same as recoveryStopsBefore, but called after applying the record. * @@ -9461,6 +9545,15 @@ void StartupXLOG(void) } g_redoStartLsn[2] = record->xl_end; g_redoStartLsn[1] = record->xl_end - record->xl_tot_len; + + if (he3_point_in_time_recovery && he3recoveryStopsAfter(xlogreader)) + { + DelRangeWals(1, xlogreader->EndRecPtr, 18446744073709551615); + pushTikv(0,hashMapSize(),true); + while (ReadRecord(xlogreader, LOG, false) != NULL) { + } + break; + } /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogreader, LOG, false); } while (record != NULL); @@ -10087,7 +10180,8 @@ CheckRecoveryConsistency(void) if (XLogRecPtrIsInvalid(minRecoveryPoint) && !he3_point_in_time_recovery) return; - Assert(InArchiveRecovery); + if (!he3_point_in_time_recovery) + Assert(InArchiveRecovery); /* * assume that we are called in the startup process, and hence don't need diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index b5398da..a4edf27 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -995,7 +995,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * always have left a zero-filled buffer, complain if not PageIsNew. */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); - if (!PageIsNew((Page) bufBlock)) + if (!InRecovery && !PageIsNew((Page) bufBlock)) ereport(PANIC, (errmsg("unexpected data beyond EOF in block %u of relation %s", blockNum, relpath(smgr->smgr_rnode, forkNum)), @@ -1067,9 +1067,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isExtend) { /* new buffers are zero-filled */ - MemSet((char *) bufBlock, 0, BLCKSZ); + if (!InRecovery || !found) { + MemSet((char *) bufBlock, 0, BLCKSZ); + smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false); + } /* don't set checksum for all-zero page */ - smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false); /* for new page precache */ if (*preCacheNodesCountPtr > 0)