diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 4e21d26..48454dc 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4074,7 +4074,7 @@ void XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) // WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ; /* if we have already flushed that far, we're done */ - if (WriteRqstPtr <= LogwrtResult.Flush) + if (WriteRqstPtr <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush)) return; } @@ -4409,7 +4409,7 @@ He3DBXLogFlush(XLogRecPtr record) } /* Quick exit if already known flushed */ - if (record <= LogwrtResult.Flush) + if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush)) return; #ifdef WAL_DEBUG @@ -4449,7 +4449,7 @@ He3DBXLogFlush(XLogRecPtr record) SpinLockRelease(&XLogCtl->info_lck); /* done already? */ - if (record <= LogwrtResult.Flush) + if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush)) break; /* @@ -4479,7 +4479,7 @@ He3DBXLogFlush(XLogRecPtr record) SpinLockAcquire(&XLogCtl->info_lck); LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); - if (record <= LogwrtResult.Flush) + if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush)) { // LWLockRelease(WALWriteLock); break; @@ -4549,11 +4549,11 @@ He3DBXLogFlush(XLogRecPtr record) * calls from bufmgr.c are not within critical sections and so we will not * force a restart for a bad LSN on a data page. */ - if (LogwrtResult.Flush < record) + if ((XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush) < record) elog(ERROR, "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X", LSN_FORMAT_ARGS(record), - LSN_FORMAT_ARGS(LogwrtResult.Flush)); + LSN_FORMAT_ARGS((XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))); } bool @@ -4740,7 +4740,7 @@ He3DBXLogBackgroundFlush(void) WriteRqst.Write = WriteRqst.Write >= XLogCtl->asyncXactLSN ? WriteRqst.Write:XLogCtl->asyncXactLSN; SpinLockRelease(&XLogCtl->info_lck); - if (WriteRqst.Write <= LogwrtResult.Flush) + if (WriteRqst.Write <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush)) { return false; } @@ -4751,7 +4751,7 @@ He3DBXLogBackgroundFlush(void) */ now = GetCurrentTimestamp(); - flushbytes = WriteRqst.Write - LogwrtResult.Flush; + flushbytes = WriteRqst.Write - (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush); if (WalWriterFlushAfter == 0 || lastflush == 0) { @@ -4799,7 +4799,7 @@ He3DBXLogBackgroundFlush(void) LogwrtResult = XLogCtl->LogwrtResult; SpinLockRelease(&XLogCtl->info_lck); if (WriteRqst.Write > LogwrtResult.Write || - WriteRqst.Flush > LogwrtResult.Flush) + WriteRqst.Flush > (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush)) { He3DBXLogFakeWrite(WriteRqst); } @@ -4879,7 +4879,7 @@ bool XLogNeedsFlush(XLogRecPtr record) } /* Quick exit if already known flushed */ - if (record <= LogwrtResult.Flush) + if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush)) return false; /* read LogwrtResult and update local state */ @@ -4888,7 +4888,7 @@ bool XLogNeedsFlush(XLogRecPtr record) SpinLockRelease(&XLogCtl->info_lck); /* check again */ - if (record <= LogwrtResult.Flush) + if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush)) return false; return true; @@ -9245,10 +9245,12 @@ void StartupXLOG(void) Insert->PrevBytePos = checkPoint.redo; Insert->CurrBytePos = checkPoint.redo; XLogCtl->InitializedUpTo = checkPoint.redo; - LogwrtResult.Write = LogwrtResult.Flush = checkPoint.redo; + LogwrtResult.Write = checkPoint.redo; + pg_atomic_write_u64(&LogwrtResult.Flush, checkPoint.redo); XLogCtl->LogwrtResult = LogwrtResult; XLogCtl->LogwrtRqst.Write = checkPoint.redo; - XLogCtl->LogwrtRqst.Flush = checkPoint.redo; + // XLogCtl->LogwrtRqst.Flush = checkPoint.redo; + pg_atomic_write_u64(&(XLogCtl->LogwrtRqst.Flush), checkPoint.redo); XLogCtl->ThisTimeLineID = ThisTimeLineID; //for checkpoint insert into tikv XLogBeginRead(xlogreader, RecPtr); @@ -9902,12 +9904,13 @@ void StartupXLOG(void) XLogCtl->InitializedUpTo = EndOfLog; // } - LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; + LogwrtResult.Write = EndOfLog; + pg_atomic_write_u64(&LogwrtResult.Flush, EndOfLog); XLogCtl->LogwrtResult = LogwrtResult; XLogCtl->LogwrtRqst.Write = EndOfLog; - XLogCtl->LogwrtRqst.Flush = EndOfLog; + pg_atomic_write_u64(&(XLogCtl->LogwrtRqst.Flush), EndOfLog); LocalSetXLogInsertAllowed(); @@ -10654,11 +10657,10 @@ GetInsertRecPtr(void) XLogRecPtr GetFlushRecPtr(void) { - SpinLockAcquire(&XLogCtl->info_lck); - LogwrtResult = XLogCtl->LogwrtResult; - SpinLockRelease(&XLogCtl->info_lck); - - return LogwrtResult.Flush; + // SpinLockAcquire(&XLogCtl->info_lck); + // LogwrtResult = XLogCtl->LogwrtResult; + // SpinLockRelease(&XLogCtl->info_lck); + return (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush); } /*