diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c index dd0c124..06d08c5 100644 --- a/contrib/pg_visibility/pg_visibility.c +++ b/contrib/pg_visibility/pg_visibility.c @@ -398,7 +398,7 @@ pg_truncate_visibility_map(PG_FUNCTION_ARGS) if (BlockNumberIsValid(block)) { fork = VISIBILITYMAP_FORKNUM; - smgrtruncate(rel->rd_smgr, &fork, 1, &block); + smgrtruncatelsn(rel->rd_smgr, &fork, 1, &block); } if (RelationNeedsWAL(rel)) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 00f3a63..0d096de 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5808,7 +5808,7 @@ static void xact_redo_commit(xl_xact_parsed_commit *parsed, TransactionId xid, XLogRecPtr lsn, - RepOriginId origin_id) + RepOriginId origin_id, XLogRecPtr startlsn) { TransactionId max_xid; TimestampTz commit_time; @@ -5916,6 +5916,15 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, if (push_standby) { pushTikv(0, hashMapSize(), true); + } else { + XLogRecPtr consistPtr; + consistPtr = GetXLogPushToDisk(); + while (consistPtr < startlsn) + { + pg_usleep(100000L); + elog(LOG, "standby consist lsn %ld, commit lsn %ld", consistPtr, startlsn); + consistPtr = GetXLogPushToDisk(); + } } /* Make sure files supposed to be dropped are dropped */ @@ -5957,7 +5966,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, */ static void xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid, - XLogRecPtr lsn, RepOriginId origin_id) + XLogRecPtr lsn, RepOriginId origin_id, XLogRecPtr startlsn) { TransactionId max_xid; @@ -6025,6 +6034,15 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid, if (push_standby) { pushTikv(0, hashMapSize(), true); + } else { + XLogRecPtr consistPtr; + consistPtr = GetXLogPushToDisk(); + while (consistPtr < startlsn) + { + pg_usleep(100000L); + elog(LOG, "standby consist lsn %ld, abort lsn %ld", consistPtr, startlsn); + consistPtr = GetXLogPushToDisk(); + } } DropRelationFiles(parsed->xnodes, parsed->nrels, true); @@ -6046,7 +6064,7 @@ xact_redo(XLogReaderState *record) ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); xact_redo_commit(&parsed, XLogRecGetXid(record), - record->EndRecPtr, XLogRecGetOrigin(record)); + record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr); } else if (info == XLOG_XACT_COMMIT_PREPARED) { @@ -6055,7 +6073,7 @@ xact_redo(XLogReaderState *record) ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed); xact_redo_commit(&parsed, parsed.twophase_xid, - record->EndRecPtr, XLogRecGetOrigin(record)); + record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr); /* Delete TwoPhaseState gxact entry and/or 2PC file. */ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); @@ -6069,7 +6087,7 @@ xact_redo(XLogReaderState *record) ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); xact_redo_abort(&parsed, XLogRecGetXid(record), - record->EndRecPtr, XLogRecGetOrigin(record)); + record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr); } else if (info == XLOG_XACT_ABORT_PREPARED) { @@ -6078,7 +6096,7 @@ xact_redo(XLogReaderState *record) ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed); xact_redo_abort(&parsed, parsed.twophase_xid, - record->EndRecPtr, XLogRecGetOrigin(record)); + record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr); /* Delete TwoPhaseState gxact entry and/or 2PC file. */ LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 3bbeaad..ba2a871 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -9330,6 +9330,20 @@ void StartupXLOG(void) if (rmgrId == RM_SMGR_ID && info == XLOG_SMGR_TRUNCATE) { pushTikv(0,hashMapSize(),true); } + } else { + RmgrId rmgrId = XLogRecGetRmid(xlogreader); + uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK; + if (rmgrId == RM_SMGR_ID && info == XLOG_SMGR_TRUNCATE) { + XLogRecPtr consistPtr, startlsn; + consistPtr = GetXLogPushToDisk(); + startlsn = record->xl_end - record->xl_tot_len; + while (consistPtr < startlsn) + { + pg_usleep(100000L); + elog(LOG, "standby consist lsn %ld, truncate lsn %ld", consistPtr, startlsn); + consistPtr = GetXLogPushToDisk(); + } + } } RmgrTable[record->xl_rmid].rm_redo(xlogreader); hasReplay = true; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 78e7b0f..8dfc5d7 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1172,6 +1172,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, } } } + bufHdr->pageIsVaild = true; } } else { smgrread(smgr, forkNum, blockNum, (char *) bufBlock); @@ -1476,8 +1477,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * If we get here, previous attempts to read the buffer must * have failed ... but we shall bravely try again. */ + if (buf->pageIsVaild == false) { + *exist = false; + } else { + *exist = true; + } *foundPtr = false; - *exist = true; } } @@ -1696,6 +1701,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * If we get here, previous attempts to read the buffer * must have failed ... but we shall bravely try again. */ + if (buf->pageIsVaild == false) { + *exist = false; + } else { + *exist = true; + } *foundPtr = false; } } @@ -1748,7 +1758,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE; else buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; - + buf->pageIsVaild = false; UnlockBufHdr(buf, buf_state); if (oldPartitionLock != NULL) @@ -1766,8 +1776,14 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * to read it before we did, so there's nothing left for BufferAlloc() to * do. */ - if (StartBufferIO(buf, true)) + if (StartBufferIO(buf, true)) { + if (buf->pageIsVaild == false) { + *exist = false; + } else { + *exist = true; + } *foundPtr = false; + } else *foundPtr = true; diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 8949419..a4d836b 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -773,7 +773,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo pageKey.relfileNode.relNode = pageTag.rnode.relNode; pageKey.forkNo = (uint32)pageTag.forkNum; pageKey.blkNo = pageTag.blockNum; - pageKey.pageLsn = 0; + pageKey.pageLsn = GetXLogPushToDisk(); pageKey.replyLsn = lsn; odpk.pk = pageKey; @@ -827,7 +827,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo return nbytes; } - pageKey.pageLsn = PageGetLSN(*buffer); + pageKey.pageLsn = GetXLogPushToDisk(); pageKey.replyLsn = lsn; diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 9661f83..e8b5674 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -197,6 +197,7 @@ typedef struct BufferDesc int wait_backend_pid; /* backend PID of pin-count waiter */ int freeNext; /* link in freelist chain */ LWLock content_lock; /* to lock access to buffer contents */ + bool pageIsVaild; } BufferDesc; extern BufferDesc **bulk_io_in_progress_buf;