replayed lsn should exceed pushtodisk when replay truncate or drop table

Signed-off-by: shipixian <shipixian_yewu@cmss.chinamobile.com>
This commit is contained in:
shipixian 2023-06-09 16:20:36 +08:00
parent 72929ac58f
commit d74712c27a
6 changed files with 61 additions and 12 deletions

View File

@ -398,7 +398,7 @@ pg_truncate_visibility_map(PG_FUNCTION_ARGS)
if (BlockNumberIsValid(block))
{
fork = VISIBILITYMAP_FORKNUM;
smgrtruncate(rel->rd_smgr, &fork, 1, &block);
smgrtruncatelsn(rel->rd_smgr, &fork, 1, &block);
}
if (RelationNeedsWAL(rel))

View File

@ -5808,7 +5808,7 @@ static void
xact_redo_commit(xl_xact_parsed_commit *parsed,
TransactionId xid,
XLogRecPtr lsn,
RepOriginId origin_id)
RepOriginId origin_id, XLogRecPtr startlsn)
{
TransactionId max_xid;
TimestampTz commit_time;
@ -5916,6 +5916,15 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
if (push_standby)
{
pushTikv(0, hashMapSize(), true);
} else {
XLogRecPtr consistPtr;
consistPtr = GetXLogPushToDisk();
while (consistPtr < startlsn)
{
pg_usleep(100000L);
elog(LOG, "standby consist lsn %ld, commit lsn %ld", consistPtr, startlsn);
consistPtr = GetXLogPushToDisk();
}
}
/* Make sure files supposed to be dropped are dropped */
@ -5957,7 +5966,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
*/
static void
xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
XLogRecPtr lsn, RepOriginId origin_id)
XLogRecPtr lsn, RepOriginId origin_id, XLogRecPtr startlsn)
{
TransactionId max_xid;
@ -6025,6 +6034,15 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
if (push_standby)
{
pushTikv(0, hashMapSize(), true);
} else {
XLogRecPtr consistPtr;
consistPtr = GetXLogPushToDisk();
while (consistPtr < startlsn)
{
pg_usleep(100000L);
elog(LOG, "standby consist lsn %ld, abort lsn %ld", consistPtr, startlsn);
consistPtr = GetXLogPushToDisk();
}
}
DropRelationFiles(parsed->xnodes, parsed->nrels, true);
@ -6046,7 +6064,7 @@ xact_redo(XLogReaderState *record)
ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
xact_redo_commit(&parsed, XLogRecGetXid(record),
record->EndRecPtr, XLogRecGetOrigin(record));
record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr);
}
else if (info == XLOG_XACT_COMMIT_PREPARED)
{
@ -6055,7 +6073,7 @@ xact_redo(XLogReaderState *record)
ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
xact_redo_commit(&parsed, parsed.twophase_xid,
record->EndRecPtr, XLogRecGetOrigin(record));
record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr);
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
@ -6069,7 +6087,7 @@ xact_redo(XLogReaderState *record)
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
xact_redo_abort(&parsed, XLogRecGetXid(record),
record->EndRecPtr, XLogRecGetOrigin(record));
record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
{
@ -6078,7 +6096,7 @@ xact_redo(XLogReaderState *record)
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
xact_redo_abort(&parsed, parsed.twophase_xid,
record->EndRecPtr, XLogRecGetOrigin(record));
record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr);
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);

View File

@ -9330,6 +9330,20 @@ void StartupXLOG(void)
if (rmgrId == RM_SMGR_ID && info == XLOG_SMGR_TRUNCATE) {
pushTikv(0,hashMapSize(),true);
}
} else {
RmgrId rmgrId = XLogRecGetRmid(xlogreader);
uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
if (rmgrId == RM_SMGR_ID && info == XLOG_SMGR_TRUNCATE) {
XLogRecPtr consistPtr, startlsn;
consistPtr = GetXLogPushToDisk();
startlsn = record->xl_end - record->xl_tot_len;
while (consistPtr < startlsn)
{
pg_usleep(100000L);
elog(LOG, "standby consist lsn %ld, truncate lsn %ld", consistPtr, startlsn);
consistPtr = GetXLogPushToDisk();
}
}
}
RmgrTable[record->xl_rmid].rm_redo(xlogreader);
hasReplay = true;

View File

@ -1172,6 +1172,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
}
}
}
bufHdr->pageIsVaild = true;
}
} else {
smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
@ -1476,8 +1477,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* If we get here, previous attempts to read the buffer must
* have failed ... but we shall bravely try again.
*/
if (buf->pageIsVaild == false) {
*exist = false;
} else {
*exist = true;
}
*foundPtr = false;
*exist = true;
}
}
@ -1696,6 +1701,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* If we get here, previous attempts to read the buffer
* must have failed ... but we shall bravely try again.
*/
if (buf->pageIsVaild == false) {
*exist = false;
} else {
*exist = true;
}
*foundPtr = false;
}
}
@ -1748,7 +1758,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
else
buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
buf->pageIsVaild = false;
UnlockBufHdr(buf, buf_state);
if (oldPartitionLock != NULL)
@ -1766,8 +1776,14 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* to read it before we did, so there's nothing left for BufferAlloc() to
* do.
*/
if (StartBufferIO(buf, true))
if (StartBufferIO(buf, true)) {
if (buf->pageIsVaild == false) {
*exist = false;
} else {
*exist = true;
}
*foundPtr = false;
}
else
*foundPtr = true;

View File

@ -773,7 +773,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
pageKey.relfileNode.relNode = pageTag.rnode.relNode;
pageKey.forkNo = (uint32)pageTag.forkNum;
pageKey.blkNo = pageTag.blockNum;
pageKey.pageLsn = 0;
pageKey.pageLsn = GetXLogPushToDisk();
pageKey.replyLsn = lsn;
odpk.pk = pageKey;
@ -827,7 +827,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
return nbytes;
}
pageKey.pageLsn = PageGetLSN(*buffer);
pageKey.pageLsn = GetXLogPushToDisk();
pageKey.replyLsn = lsn;

View File

@ -197,6 +197,7 @@ typedef struct BufferDesc
int wait_backend_pid; /* backend PID of pin-count waiter */
int freeNext; /* link in freelist chain */
LWLock content_lock; /* to lock access to buffer contents */
bool pageIsVaild;
} BufferDesc;
extern BufferDesc **bulk_io_in_progress_buf;