update Flush

Signed-off-by: shipixian <shipixian_yewu@cmss.chinamobile.com>
This commit is contained in:
shipixian 2023-05-16 16:01:22 +08:00
parent 661e0b67f6
commit 8b8084813d

View File

@ -4074,7 +4074,7 @@ void XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
// WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
/* if we have already flushed that far, we're done */
if (WriteRqstPtr <= LogwrtResult.Flush)
if (WriteRqstPtr <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
return;
}
@ -4409,7 +4409,7 @@ He3DBXLogFlush(XLogRecPtr record)
}
/* Quick exit if already known flushed */
if (record <= LogwrtResult.Flush)
if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
return;
#ifdef WAL_DEBUG
@ -4449,7 +4449,7 @@ He3DBXLogFlush(XLogRecPtr record)
SpinLockRelease(&XLogCtl->info_lck);
/* done already? */
if (record <= LogwrtResult.Flush)
if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
break;
/*
@ -4479,7 +4479,7 @@ He3DBXLogFlush(XLogRecPtr record)
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
if (record <= LogwrtResult.Flush)
if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
{
// LWLockRelease(WALWriteLock);
break;
@ -4549,11 +4549,11 @@ He3DBXLogFlush(XLogRecPtr record)
* calls from bufmgr.c are not within critical sections and so we will not
* force a restart for a bad LSN on a data page.
*/
if (LogwrtResult.Flush < record)
if ((XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush) < record)
elog(ERROR,
"xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
LSN_FORMAT_ARGS(record),
LSN_FORMAT_ARGS(LogwrtResult.Flush));
LSN_FORMAT_ARGS((XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush)));
}
bool
@ -4740,7 +4740,7 @@ He3DBXLogBackgroundFlush(void)
WriteRqst.Write = WriteRqst.Write >= XLogCtl->asyncXactLSN ? WriteRqst.Write:XLogCtl->asyncXactLSN;
SpinLockRelease(&XLogCtl->info_lck);
if (WriteRqst.Write <= LogwrtResult.Flush)
if (WriteRqst.Write <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
{
return false;
}
@ -4751,7 +4751,7 @@ He3DBXLogBackgroundFlush(void)
*/
now = GetCurrentTimestamp();
flushbytes = WriteRqst.Write - LogwrtResult.Flush;
flushbytes = WriteRqst.Write - (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush);
if (WalWriterFlushAfter == 0 || lastflush == 0)
{
@ -4799,7 +4799,7 @@ He3DBXLogBackgroundFlush(void)
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
if (WriteRqst.Write > LogwrtResult.Write ||
WriteRqst.Flush > LogwrtResult.Flush)
WriteRqst.Flush > (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
{
He3DBXLogFakeWrite(WriteRqst);
}
@ -4879,7 +4879,7 @@ bool XLogNeedsFlush(XLogRecPtr record)
}
/* Quick exit if already known flushed */
if (record <= LogwrtResult.Flush)
if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
return false;
/* read LogwrtResult and update local state */
@ -4888,7 +4888,7 @@ bool XLogNeedsFlush(XLogRecPtr record)
SpinLockRelease(&XLogCtl->info_lck);
/* check again */
if (record <= LogwrtResult.Flush)
if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
return false;
return true;
@ -9245,10 +9245,12 @@ void StartupXLOG(void)
Insert->PrevBytePos = checkPoint.redo;
Insert->CurrBytePos = checkPoint.redo;
XLogCtl->InitializedUpTo = checkPoint.redo;
LogwrtResult.Write = LogwrtResult.Flush = checkPoint.redo;
LogwrtResult.Write = checkPoint.redo;
pg_atomic_write_u64(&LogwrtResult.Flush, checkPoint.redo);
XLogCtl->LogwrtResult = LogwrtResult;
XLogCtl->LogwrtRqst.Write = checkPoint.redo;
XLogCtl->LogwrtRqst.Flush = checkPoint.redo;
// XLogCtl->LogwrtRqst.Flush = checkPoint.redo;
pg_atomic_write_u64(&(XLogCtl->LogwrtRqst.Flush), checkPoint.redo);
XLogCtl->ThisTimeLineID = ThisTimeLineID;
//for checkpoint insert into tikv
XLogBeginRead(xlogreader, RecPtr);
@ -9902,12 +9904,13 @@ void StartupXLOG(void)
XLogCtl->InitializedUpTo = EndOfLog;
// }
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
LogwrtResult.Write = EndOfLog;
pg_atomic_write_u64(&LogwrtResult.Flush, EndOfLog);
XLogCtl->LogwrtResult = LogwrtResult;
XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
pg_atomic_write_u64(&(XLogCtl->LogwrtRqst.Flush), EndOfLog);
LocalSetXLogInsertAllowed();
@ -10654,11 +10657,10 @@ GetInsertRecPtr(void)
XLogRecPtr
GetFlushRecPtr(void)
{
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
return LogwrtResult.Flush;
// SpinLockAcquire(&XLogCtl->info_lck);
// LogwrtResult = XLogCtl->LogwrtResult;
// SpinLockRelease(&XLogCtl->info_lck);
return (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush);
}
/*