diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index cdeb7dc..b3e7847 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8088,6 +8088,31 @@ void pushTikv(int onePageListLen,int pageNum,bool flag) } } +static void pageInMemoryFlushBufferToDisk(BufferTag*tag) { + Buffer buffer = XLogReadBufferExtended(tag->rnode, tag->forkNum, tag->blockNum, + RBM_NORMAL); + if (!BufferIsValid(buffer)) + { + elog(ERROR,"pageInMemoryFlushBufferToDisk is invalid rel %d,flk %d,blk %d",tag->rnode.relNode,tag->forkNum,tag->blockNum); + return; + } + //slave no need to flush disk + if (push_standby == true) { + BufferDesc *buf; + buf = GetBufferDescriptor(buffer-1); + uint32 buf_state = pg_atomic_read_u32(&buf->state); + if (buf_state & BM_DIRTY) { + LWLockAcquire(BufferDescriptorGetContentLock(buf), + LW_SHARED); + FlushOneBuffer(buffer); + LWLockRelease(BufferDescriptorGetContentLock(buf)); + ScheduleBufferTagForWriteback(&BackendWritebackContext, + &buf->tag); + } + } + ReleaseBuffer(buffer); +} + static bool data_buffer_for_replay(XLogReaderState *record,XLogRecPtr startLsn,XLogRecPtr endLsn) { @@ -8135,6 +8160,9 @@ data_buffer_for_replay(XLogReaderState *record,XLogRecPtr startLsn,XLogRecPtr en UnlockBufHdr(buf, buf_state); } updateLastReplayLsn(); + if (EnableHotStandby == true && push_standby == false) { + pageInMemoryFlushBufferToDisk(&tag); + } ReleaseBuffer(buffer); } else { updateLastReplayLsn(); @@ -14372,7 +14400,10 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL if (gRingBufferManger->maxIdx != 0 ) { ring_buffer_dequeue_arr(gRingBufferManger,gRingBufferManger->maxIdx); gRingBufferManger->maxIdx = 0; - pushFlag = false; + if (pushFlag == true) { + pushFlag = false; + return -1; + } } } if (1 == ring_buffer_peek(gRingBufferManger,&FirstData,gRingBufferManger->maxIdx)) {