Merge remote-tracking branch 'upstream/dev_performance' into dev_performance

This commit is contained in:
zoujia 2023-05-19 17:26:28 +08:00
commit 0c78b9fbb2
14 changed files with 363 additions and 442 deletions

View File

@ -54,6 +54,7 @@ func runArchive(cmd *cobra.Command, args []string) {
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(access_key, secret_key, ""),
S3ForcePathStyle: aws.Bool(true),
})
if err != nil {
fmt.Printf("Connect S3 Error!\n%v\n", err)

View File

@ -54,6 +54,7 @@ func runRestore(cmd *cobra.Command, args []string) {
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(access_key, secret_key, ""),
S3ForcePathStyle: aws.Bool(true),
})
if err != nil {
fmt.Printf("Connect S3 Error!\n%v\n", err)

View File

@ -68,6 +68,7 @@ func runArchive(cmd *cobra.Command, args []string) {
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(access_key, secret_key, ""),
S3ForcePathStyle: aws.Bool(true),
})
if err != nil {
fmt.Printf("Connect S3 Error!\n%v\n", err)

View File

@ -51,6 +51,7 @@ func runBackup(cmd *cobra.Command, args []string) {
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(access_key, secret_key, ""),
S3ForcePathStyle: aws.Bool(true),
})
if err != nil {
fmt.Printf("Connect S3 Error!\n%v\n", err)

View File

@ -52,6 +52,7 @@ func runRestore(cmd *cobra.Command, args []string) {
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(access_key, secret_key, ""),
S3ForcePathStyle: aws.Bool(true),
})
if err != nil {
fmt.Printf("Connect S3 Error!\n%v\n", err)

View File

@ -0,0 +1,75 @@
#!/bin/bash
export PATH=/home/postgres/psql14/bin:$PATH
primaryDataDir=/home/postgres/primary/pgdata
primaryImdbPageDirectory=/tmp/primarypagedb
primaryImdbWalDirectory=/tmp/primarywaldb
primaryLogfile=/home/postgres/primarylogfile
primaryPort=15432
primaryConninfo='application_name=pushstandby user=repl password=123456 host=127.0.0.1 port=15432 sslmode=disable sslcompression=0 gssencmode=disable target_session_attrs=any'
pushDataDir=/home/postgres/push/pgdata
pushImdbPageDirectory=/tmp/pushpagedb
pushImdbWalDirectory=/tmp/pushwaldb
pushLogfile=/home/postgres/pushlogfile
if [ ! -d "$primaryDataDir" ]; then
echo "$primaryDataDir does not exist!"
exit 1
fi
if [ "`ls -A $primaryDataDir`" != "" ]; then
echo "$primaryDataDir is not enpty!"
exit 1
fi
pg_ctl -D $pushDataDir -l $pushLogfile stop
if [ $? -ne 0 ]
then
echo "$(date "+%F %T"): He3DB stop failed!"
exit 1
fi
sed -i 's/^primary_conninfo/#primary_conninfo/g' $pushDataDir/postgresql.auto.conf
sed -i 's/^primary_conninfo/#primary_conninfo/g' $pushDataDir/postgresql.conf
sed -i 's/^he3mirror/#he3mirror/g' $pushDataDir/postgresql.conf
rsync -av --exclude base --exclude global --exclude standby.signal --exclude backup_label.old --exclude backup_manifest $pushDataDir/* $primaryDataDir/
if [ $? -ne 0 ]
then
echo "$(date "+%F %T"): sync data file failed!"
exit 1
fi
ln -s $pushDataDir/base $primaryDataDir/base
ln -s $pushDataDir/global $primaryDataDir/global
echo -e "primary_conninfo = '$primaryConninfo'" >> $pushDataDir/postgresql.conf
echo -e "he3mirror=false" >> $pushDataDir/postgresql.conf
sed -i 's/^push_standby/#push_standby/g' $primaryDataDir/postgresql.conf
sed -i 's/^hot_standby/#hot_standby/g' $primaryDataDir/postgresql.conf
sed -i 's/^port/#port/g' $primaryDataDir/postgresql.conf
sed -i 's/^lmdb_page_directory/#lmdb_page_directory/g' $primaryDataDir/postgresql.conf
sed -i 's/^lmdb_wal_directory/#lmdb_wal_directory/g' $primaryDataDir/postgresql.conf
echo -e "push_standby=off" >> $primaryDataDir/postgresql.conf
echo -e "hot_standby=off" >> $primaryDataDir/postgresql.conf
echo -e "port=$primaryPort" >> $primaryDataDir/postgresql.conf
echo -e "he3mirror=false" >> $primaryDataDir/postgresql.conf
echo -e "lmdb_page_directory='$primaryImdbPageDirectory'" >> $primaryDataDir/postgresql.conf
echo -e "lmdb_wal_directory='$primaryImdbWalDirectory'" >> $primaryDataDir/postgresql.conf
rm -rf $primaryImdbPageDirectory $primaryImdbWalDirectory $pushImdbPageDirectory $pushImdbWalDirectory
pg_ctl -D $primaryDataDir -l $primaryLogfile start
if [ $? -ne 0 ]
then
echo "$(date "+%F %T"): He3DB primary instance start failed!"
exit 1
fi
pg_ctl -D $pushDataDir -l $pushLogfile start
if [ $? -ne 0 ]
then
echo "$(date "+%F %T"): He3DB push instance start failed!"
exit 1
fi

View File

@ -8491,6 +8491,11 @@ heap_xlog_prune(XLogReaderState *record)
}
if (mode != RBM_NORMAL_VALID && BufferIsValid(buffer))
{
UnlockReleaseBuffer(buffer);
}
/*if (mode != RBM_NORMAL_VALID && BufferIsValid(buffer))
{
Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
@ -8505,8 +8510,8 @@ heap_xlog_prune(XLogReaderState *record)
* Do this regardless of a full-page image being applied, since the
* FSM data is not in the page anyway.
*/
XLogRecordPageWithFreeSpace(rnode, blkno, freespace);
}
/*XLogRecordPageWithFreeSpace(rnode, blkno, freespace);
}*/
}
/*
@ -8560,6 +8565,11 @@ heap_xlog_vacuum(XLogReaderState *record)
}
if (BufferIsValid(buffer))
{
UnlockReleaseBuffer(buffer);
}
/*if (BufferIsValid(buffer))
{
Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer));
RelFileNode rnode;
@ -8577,8 +8587,8 @@ heap_xlog_vacuum(XLogReaderState *record)
* Do this regardless of a full-page image being applied, since the
* FSM data is not in the page anyway.
*/
XLogRecordPageWithFreeSpace(rnode, blkno, freespace);
}
/*XLogRecordPageWithFreeSpace(rnode, blkno, freespace);
}*/
}
/*
@ -8642,6 +8652,8 @@ heap_xlog_visible(XLogReaderState *record)
PageSetAllVisible(page);
MarkBufferDirty(buffer);
PageSetLSN(page, lsn);
}
else if (action == BLK_RESTORED)
{
@ -8653,6 +8665,11 @@ heap_xlog_visible(XLogReaderState *record)
}
if (BufferIsValid(buffer))
{
UnlockReleaseBuffer(buffer);
}
/*if (BufferIsValid(buffer))
{
Size space = PageGetFreeSpace(BufferGetPage(buffer));
@ -8675,9 +8692,9 @@ heap_xlog_visible(XLogReaderState *record)
* Do this regardless of a full-page image being applied, since the
* FSM data is not in the page anyway.
*/
if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
/*if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
XLogRecordPageWithFreeSpace(rnode, blkno, space);
}
}*/
/*
* Even if we skipped the heap page update due to the LSN interlock, it's
@ -8842,6 +8859,11 @@ he3_heap_xlog_visible(XLogReaderState *record)
}
if (BufferIsValid(buffer))
{
UnlockReleaseBuffer(buffer);
}
/*if (BufferIsValid(buffer))
{
Size space = PageGetFreeSpace(BufferGetPage(buffer));
@ -8864,18 +8886,11 @@ he3_heap_xlog_visible(XLogReaderState *record)
* Do this regardless of a full-page image being applied, since the
* FSM data is not in the page anyway.
*/
if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
/*if (xlrec->flags & VISIBILITYMAP_VALID_BITS)
XLogRecordPageWithFreeSpace(rnode, blkno, space);
}
}*/
break;
}
}
}

View File

@ -495,7 +495,7 @@ typedef struct XLogwrtRqst
typedef struct XLogwrtResult
{
XLogRecPtr Write; /* last byte + 1 written out */
XLogRecPtr Flush; /* last byte + 1 flushed */
pg_atomic_uint64 Flush; /* last byte + 1 flushed */
} XLogwrtResult;
@ -671,7 +671,7 @@ typedef struct FlushInfo
typedef struct XLogParralFlush
{
uint32 begin;
pg_atomic_uint32 begin;
uint32 last;
// uint32 count;
// uint32 diff;
@ -879,7 +879,7 @@ static int UsableBytesInSegment;
* Private, possibly out-of-date copy of shared LogwrtResult.
* See discussion above.
*/
static XLogwrtResult LogwrtResult = {0, 0};
static XLogwrtResult LogwrtResult;
/*
* Codes indicating where we got a WAL file from during recovery, or where
@ -991,6 +991,7 @@ static void readRecoverySignalFile(void);
static void validateRecoveryParameters(void);
static void exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog);
static bool recoveryStopsBefore(XLogReaderState *record);
static bool he3recoveryStopsAfter(XLogReaderState *record);
static bool recoveryStopsAfter(XLogReaderState *record);
static void ConfirmRecoveryPaused(void);
static void recoveryPausesHere(bool endOfRecovery);
@ -1013,7 +1014,6 @@ static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
static void He3DBAdvanceXLInsertBuffer(int xlogLength,XLogRecPtr upto, bool opportunistic);
static bool XLogCheckpointNeeded(XLogSegNo new_segno);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
static void He3DBXLogWrite(XLogwrtRqst WriteRqst, bool flexible);
static void He3DBXLogFakeWrite(XLogwrtRqst WriteRqst);
static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
@ -2673,26 +2673,28 @@ He3DBGetXLogBuffer(int xlogLength,XLogRecPtr ptr)
/*xlog在xlog buffer中的偏移*/
offset = ptr % ((XLOGbuffers-1) * XLOG_BLCKSZ);
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl-> LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
// SpinLockAcquire(&XLogCtl->info_lck);
// LogwrtResult = XLogCtl-> LogwrtResult;
// SpinLockRelease(&XLogCtl->info_lck);
/*
* xlog的起始位置已经开始覆盖xlog buffer上尚未写出的数据了
*
*/
// WALInsertLockUpdateInsertingAt(ptr);
while (ptr - LogwrtResult.Flush > ((XLOGbuffers-1) * XLOG_BLCKSZ))
XLogRecPtr flushed = (XLogRecPtr) pg_atomic_read_u64(&(XLogCtl->LogwrtResult.Flush));
while (ptr - flushed > ((XLOGbuffers-1) * XLOG_BLCKSZ))
{
usleep(500);
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl -> LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
// SpinLockAcquire(&XLogCtl->info_lck);
// LogwrtResult = XLogCtl -> LogwrtResult;
// SpinLockRelease(&XLogCtl->info_lck);
flushed = (XLogRecPtr) pg_atomic_read_u64(&(XLogCtl->LogwrtResult.Flush));
}
/*
* xlog buffer上剩余的空间能够写下此xlog并且也没有超时
*/
if (!(ptr + xlogLength - XLogCtl -> LogwrtResult.Flush >= ((XLOGbuffers-2) * XLOG_BLCKSZ) /* || isWriteTimeOut()*/))
if (!(ptr + xlogLength - flushed >= ((XLOGbuffers-2) * XLOG_BLCKSZ) /* || isWriteTimeOut()*/))
{
// WALInsertLockUpdateInsertingAt(ptr);
He3DBAdvanceXLInsertBuffer(xlogLength, ptr, true);
@ -2953,7 +2955,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
WriteRqst.Write = OldPageRqstPtr;
WriteRqst.Flush = 0;
XLogWrite(WriteRqst, false);
// XLogWrite(WriteRqst, false);
LWLockRelease(WALWriteLock);
WalStats.m_wal_buffers_full++;
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
@ -3087,6 +3089,8 @@ He3DBAdvanceXLInsertBuffer(int xlogLength, XLogRecPtr upto, bool opportunistic)
*
*/
XLogRecPtr flushed = (XLogRecPtr) pg_atomic_read_u64(&(XLogCtl->LogwrtResult.Flush));
if (!opportunistic)
{
@ -3101,7 +3105,7 @@ He3DBAdvanceXLInsertBuffer(int xlogLength, XLogRecPtr upto, bool opportunistic)
* TODO
*
*/
if (upto + xlogLength - LogwrtResult.Flush >= ((XLOGbuffers-2) * XLOG_BLCKSZ) /* || isWriteTimeOut()*/)
if (upto + xlogLength - flushed >= ((XLOGbuffers-2) * XLOG_BLCKSZ) /* || isWriteTimeOut()*/)
{
/*
* Must acquire write lock. Release WALBufMappingLock first,
@ -3127,7 +3131,7 @@ He3DBAdvanceXLInsertBuffer(int xlogLength, XLogRecPtr upto, bool opportunistic)
// LWLockRelease(WALWriteLock);
// }
// else
if (upto + xlogLength - LogwrtResult.Flush >= ((XLOGbuffers-2) * XLOG_BLCKSZ))
if (upto + xlogLength - flushed >= ((XLOGbuffers-2) * XLOG_BLCKSZ))
{
/* Have to write it ourselves */
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
@ -3147,12 +3151,13 @@ He3DBAdvanceXLInsertBuffer(int xlogLength, XLogRecPtr upto, bool opportunistic)
// SpinLockAcquire(&XLogCtl->info_lck);
// LogwrtResult = XLogCtl->LogwrtResult;
// SpinLockRelease(&XLogCtl->info_lck);
while(upto + xlogLength - LogwrtResult.Flush >= ((XLOGbuffers-1) * XLOG_BLCKSZ))
while(upto + xlogLength - flushed >= ((XLOGbuffers-1) * XLOG_BLCKSZ))
{
usleep(500);
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
// SpinLockAcquire(&XLogCtl->info_lck);
// LogwrtResult = XLogCtl->LogwrtResult;
// SpinLockRelease(&XLogCtl->info_lck);
flushed = (XLogRecPtr) pg_atomic_read_u64(&(XLogCtl->LogwrtResult.Flush));
}
/*
if (remaindXlogBufferLength >= xlogLength)
@ -3293,303 +3298,6 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
return false;
}
/*
* Write and/or fsync the log at least as far as WriteRqst indicates.
*
* If flexible == true, we don't have to write as far as WriteRqst, but
* may stop at any convenient boundary (such as a cache or logfile boundary).
* This option allows us to avoid uselessly issuing multiple writes when a
* single one would do.
*
* Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst)
* must be called before grabbing the lock, to make sure the data is ready to
* write.
*/
static void
XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
{
bool ispartialpage;
bool last_iteration;
bool finishing_seg;
bool use_existent;
int curridx;
int npages;
int startidx;
uint32 startoffset;
/* We should always be inside a critical section here */
Assert(CritSectionCount > 0);
/*
* Update local LogwrtResult (caller probably did this already, but...)
*/
LogwrtResult = XLogCtl->LogwrtResult;
/*
* Since successive pages in the xlog cache are consecutively allocated,
* we can usually gather multiple pages together and issue just one
* write() call. npages is the number of pages we have determined can be
* written together; startidx is the cache block index of the first one,
* and startoffset is the file offset at which it should go. The latter
* two variables are only valid when npages > 0, but we must initialize
* all of them to keep the compiler quiet.
*/
npages = 0;
startidx = 0;
startoffset = 0;
/*
* Within the loop, curridx is the cache block index of the page to
* consider writing. Begin at the buffer containing the next unwritten
* page, or last partially written page.
*/
curridx = XLogRecPtrToBufIdx(LogwrtResult.Write);
while (LogwrtResult.Write < WriteRqst.Write)
{
/*
* Make sure we're not ahead of the insert process. This could happen
* if we're passed a bogus WriteRqst.Write that is past the end of the
* last page that's been initialized by AdvanceXLInsertBuffer.
*/
XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
if (LogwrtResult.Write >= EndPtr)
elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
LSN_FORMAT_ARGS(LogwrtResult.Write),
LSN_FORMAT_ARGS(EndPtr));
/* Advance LogwrtResult.Write to end of current buffer page */
LogwrtResult.Write = EndPtr;
ispartialpage = WriteRqst.Write < LogwrtResult.Write;
if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
wal_segment_size))
{
/*
* Switch to new logfile segment. We cannot have any pending
* pages here (since we dump what we have at segment end).
*/
Assert(npages == 0);
if (openLogFile >= 0)
XLogFileClose();
XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
wal_segment_size);
/* create/use new log file */
use_existent = true;
openLogFile = XLogFileInit(openLogSegNo, &use_existent, true);
ReserveExternalFD();
}
/* Make sure we have the current logfile open */
if (openLogFile < 0)
{
XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
wal_segment_size);
openLogFile = XLogFileOpen(openLogSegNo);
ReserveExternalFD();
}
/* Add current page to the set of pending pages-to-dump */
if (npages == 0)
{
/* first of group */
startidx = curridx;
startoffset = XLogSegmentOffset(LogwrtResult.Write - XLOG_BLCKSZ,
wal_segment_size);
}
npages++;
/*
* Dump the set if this will be the last loop iteration, or if we are
* at the last page of the cache area (since the next page won't be
* contiguous in memory), or if we are at the end of the logfile
* segment.
*/
last_iteration = WriteRqst.Write <= LogwrtResult.Write;
finishing_seg = !ispartialpage &&
(startoffset + npages * XLOG_BLCKSZ) >= wal_segment_size;
if (last_iteration ||
curridx == XLogCtl->XLogCacheBlck ||
finishing_seg)
{
char *from;
Size nbytes;
Size nleft;
int written;
instr_time start;
/* OK to write the page(s) */
from = XLogCtl->pages + startidx * (Size)XLOG_BLCKSZ;
nbytes = npages * (Size)XLOG_BLCKSZ;
nleft = nbytes;
do
{
errno = 0;
/* Measure I/O timing to write WAL data */
if (track_wal_io_timing)
INSTR_TIME_SET_CURRENT(start);
pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
written = pg_pwrite(openLogFile, from, nleft, startoffset);
pgstat_report_wait_end();
/*
* Increment the I/O timing and the number of times WAL data
* were written out to disk.
*/
if (track_wal_io_timing)
{
instr_time duration;
INSTR_TIME_SET_CURRENT(duration);
INSTR_TIME_SUBTRACT(duration, start);
WalStats.m_wal_write_time += INSTR_TIME_GET_MICROSEC(duration);
}
WalStats.m_wal_write++;
if (written <= 0)
{
char xlogfname[MAXFNAMELEN];
int save_errno;
if (errno == EINTR)
continue;
save_errno = errno;
XLogFileName(xlogfname, ThisTimeLineID, openLogSegNo,
wal_segment_size);
errno = save_errno;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log file %s "
"at offset %u, length %zu: %m",
xlogfname, startoffset, nleft)));
}
nleft -= written;
from += written;
startoffset += written;
} while (nleft > 0);
npages = 0;
/*
* If we just wrote the whole last page of a logfile segment,
* fsync the segment immediately. This avoids having to go back
* and re-open prior segments when an fsync request comes along
* later. Doing it here ensures that one and only one backend will
* perform this fsync.
*
* This is also the right place to notify the Archiver that the
* segment is ready to copy to archival storage, and to update the
* timer for archive_timeout, and to signal for a checkpoint if
* too many logfile segments have been used since the last
* checkpoint.
*/
if (finishing_seg)
{
issue_xlog_fsync(openLogFile, openLogSegNo);
/* signal that we need to wakeup walsenders later */
WalSndWakeupRequest();
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
if (XLogArchivingActive())
XLogArchiveNotifySeg(openLogSegNo);
XLogCtl->lastSegSwitchTime = (pg_time_t)time(NULL);
XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
/*
* Request a checkpoint if we've consumed too much xlog since
* the last one. For speed, we first check using the local
* copy of RedoRecPtr, which might be out of date; if it looks
* like a checkpoint is needed, forcibly update RedoRecPtr and
* recheck.
*/
if (IsUnderPostmaster && XLogCheckpointNeeded(openLogSegNo))
{
(void)GetRedoRecPtr();
if (XLogCheckpointNeeded(openLogSegNo))
RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
}
}
}
if (ispartialpage)
{
/* Only asked to write a partial page */
LogwrtResult.Write = WriteRqst.Write;
break;
}
curridx = NextBufIdx(curridx);
/* If flexible, break out of loop as soon as we wrote something */
if (flexible && npages == 0)
break;
}
Assert(npages == 0);
/*
* If asked to flush, do so
*/
if (LogwrtResult.Flush < WriteRqst.Flush &&
LogwrtResult.Flush < LogwrtResult.Write)
{
/*
* Could get here without iterating above loop, in which case we might
* have no open file or the wrong one. However, we do not need to
* fsync more than one file.
*/
if (sync_method != SYNC_METHOD_OPEN &&
sync_method != SYNC_METHOD_OPEN_DSYNC)
{
if (openLogFile >= 0 &&
!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
wal_segment_size))
XLogFileClose();
if (openLogFile < 0)
{
XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
wal_segment_size);
openLogFile = XLogFileOpen(openLogSegNo);
ReserveExternalFD();
}
issue_xlog_fsync(openLogFile, openLogSegNo);
}
/* signal that we need to wakeup walsenders later */
WalSndWakeupRequest();
LogwrtResult.Flush = LogwrtResult.Write;
}
/*
* Update shared-memory status
*
* We make sure that the shared 'request' values do not fall behind the
* 'result' values. This is not absolutely essential, but it saves some
* code in a couple of places.
*/
{
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->LogwrtResult = LogwrtResult;
if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
SpinLockRelease(&XLogCtl->info_lck);
}
}
int GetXlogLength(uint8 part[],int n)
{
@ -3854,7 +3562,7 @@ mustflush:
// printf("flushwals time is %ld\n",timenow - timestp);
freeItemList(xlogItemList);
WalStats.m_wal_write++;
LogwrtResult.Write = WriteRqst.Write;
// LogwrtResult.Write = WriteRqst.Write;
}
free(xlogItemList);
xlogItemList = NULL;
@ -3869,18 +3577,15 @@ mustflush:
// long unitflush;
int bRelativeOffset = 0;
int eRelativeOffset = 0;
// SpinLockAcquire(&XLogCtl->info_lck);
// XLogParralFlush flushInfo = XLogCtl->LogFlush;
// SpinLockRelease(&XLogCtl->info_lck);
// printf("end flush wals, begin %d, curLoc %d, WriteRqst.Write %ld\n", flushInfo.begin, curLoc, WriteRqst.Write);
while (XlogCtl.LogFlush.begin < curLoc)
while (pg_atomic_read_u32(&XLogCtl->LogFlush.begin) < curLoc)
{
pg_usleep(20L);
// SpinLockAcquire(&XLogCtl->info_lck);
// flushInfo = XLogCtl->LogFlush;
// SpinLockRelease(&XLogCtl->info_lck);
}
/*
@ -3909,13 +3614,15 @@ mustflush:
} else
XLogCtl->LogFlush.diff = diff;
*/
pg_atomic_write_u32(&XLogCtl->LogFlush.begin, curLoc+1);
// SpinLockAcquire(&XLogCtl->info_lck);
// XLogCtl->LogFlush.begin = curLoc+1;
// XLogCtl->oldflush = XLogCtl->LogwrtResult.Flush;
// SpinLockRelease(&XLogCtl->info_lck);
bRelativeOffset = LogwrtResult.Write % ((XLOGbuffers-1) * XLOG_BLCKSZ);
eRelativeOffset = WriteRqst.Write % ((XLOGbuffers-1) * XLOG_BLCKSZ);
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->LogFlush.begin = curLoc+1;
XLogCtl->oldflush = XLogCtl->LogwrtResult.Flush;
XLogCtl->LogwrtResult.Flush = WriteRqst.Write;
bRelativeOffset = XLogCtl->oldflush % ((XLOGbuffers-1) * XLOG_BLCKSZ);
eRelativeOffset = XLogCtl->LogwrtResult.Flush % ((XLOGbuffers-1) * XLOG_BLCKSZ);
if (bRelativeOffset <= eRelativeOffset)
{
MemSet((char *)(XLogCtl->pages + bRelativeOffset), 0, eRelativeOffset - bRelativeOffset); }
@ -3938,16 +3645,19 @@ mustflush:
// XLogCtl->ol = XLogCtl->LogwrtResult.Flush;
// }
SpinLockAcquire(&XLogCtl->info_lck);
XLogRecPtr flush = pg_atomic_read_u64(&(XLogCtl->LogwrtResult.Flush));
if (flush < WriteRqst.Write)
pg_atomic_write_u64(&(XLogCtl->LogwrtResult.Flush), WriteRqst.Write);
SpinLockRelease(&XLogCtl->info_lck);
}
//return;
}
}
SpinLockAcquire(&XLogCtl->info_lck);
XLogRecPtr flush = XLogCtl->LogwrtResult.Flush;
SpinLockRelease(&XLogCtl->info_lck);
// SpinLockAcquire(&XLogCtl->info_lck);
// XLogRecPtr flush = XLogCtl->LogwrtResult.Flush;
// SpinLockRelease(&XLogCtl->info_lck);
XLogRecPtr flush = (XLogRecPtr) pg_atomic_read_u64(&(XLogCtl->LogwrtResult.Flush));
long waitTime = 0;
while (flush < WriteRqst.Flush)
{
@ -3989,11 +3699,12 @@ mustflush:
goto mustflush;
}
SpinLockAcquire(&XLogCtl->info_lck);
// SpinLockAcquire(&XLogCtl->info_lck);
// if (curLoc == (XLogCtl->LogFlush.begin))
// elog(PANIC, "should be in here, WriteRqst.Write %ld", WriteRqst.Write);
flush = XLogCtl->LogwrtResult.Flush;
SpinLockRelease(&XLogCtl->info_lck);
// flush = XLogCtl->LogwrtResult.Flush;
// SpinLockRelease(&XLogCtl->info_lck);
flush = (XLogRecPtr) pg_atomic_read_u64(&(XLogCtl->LogwrtResult.Flush));
}
@ -4002,30 +3713,32 @@ mustflush:
static void
He3DBXLogFakeWrite(XLogwrtRqst WriteRqst)
{
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
// SpinLockAcquire(&XLogCtl->info_lck);
// LogwrtResult = XLogCtl->LogwrtResult;
// SpinLockRelease(&XLogCtl->info_lck);
// printf("invoke xlog write, upto %ld\n", WriteRqst.Write);
if ((LogwrtResult.Write <= WriteRqst.Write - (XLOGbuffers-3) * XLOG_BLCKSZ) ||
(LogwrtResult.Flush < WriteRqst.Flush))
((XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush) < WriteRqst.Flush))
{
// printf("invoke xlogwrite result write %ld, flush %ld, request write %d, flush %ld\n",
// LogwrtResult.Write, LogwrtResult.Flush, WriteRqst.Write, WriteRqst.Flush);
FlushWal(WriteRqst);
LogwrtResult.Write = LogwrtResult.Flush = WriteRqst.Write;
LogwrtResult.Write = WriteRqst.Write;
pg_atomic_write_u64(&LogwrtResult.Flush, WriteRqst.Write);
}
/*
* Update shared-memory status
*/
{
XLogRecPtr flushed = (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush);
SpinLockAcquire(&XLogCtl->info_lck);
// XLogCtl->LogwrtResult = LogwrtResult;
if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
if (XLogCtl->LogwrtRqst.Flush < flushed)
XLogCtl->LogwrtRqst.Flush = flushed;
XLogCtl->globalUpto = 0;
SpinLockRelease(&XLogCtl->info_lck);
}
@ -4059,7 +3772,7 @@ void XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
// WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
/* if we have already flushed that far, we're done */
if (WriteRqstPtr <= LogwrtResult.Flush)
if (WriteRqstPtr <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
return;
}
@ -4394,7 +4107,7 @@ He3DBXLogFlush(XLogRecPtr record)
}
/* Quick exit if already known flushed */
if (record <= LogwrtResult.Flush)
if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
return;
#ifdef WAL_DEBUG
@ -4434,7 +4147,7 @@ He3DBXLogFlush(XLogRecPtr record)
SpinLockRelease(&XLogCtl->info_lck);
/* done already? */
if (record <= LogwrtResult.Flush)
if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
break;
/*
@ -4461,10 +4174,10 @@ He3DBXLogFlush(XLogRecPtr record)
// }
/* Got the lock; recheck whether request is satisfied */
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
if (record <= LogwrtResult.Flush)
// SpinLockAcquire(&XLogCtl->info_lck);
// LogwrtResult = XLogCtl->LogwrtResult;
// SpinLockRelease(&XLogCtl->info_lck);
if (record <= (XLogRecPtr) pg_atomic_read_u64(&(XLogCtl->LogwrtResult.Flush)))
{
// LWLockRelease(WALWriteLock);
break;
@ -4534,11 +4247,12 @@ He3DBXLogFlush(XLogRecPtr record)
* calls from bufmgr.c are not within critical sections and so we will not
* force a restart for a bad LSN on a data page.
*/
if (LogwrtResult.Flush < record)
XLogRecPtr flushed = (XLogRecPtr) pg_atomic_read_u64(&(XLogCtl->LogwrtResult.Flush));
if ( flushed < record)
elog(ERROR,
"xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
LSN_FORMAT_ARGS(record),
LSN_FORMAT_ARGS(LogwrtResult.Flush));
LSN_FORMAT_ARGS(flushed));
}
bool
@ -4725,7 +4439,7 @@ He3DBXLogBackgroundFlush(void)
WriteRqst.Write = WriteRqst.Write >= XLogCtl->asyncXactLSN ? WriteRqst.Write:XLogCtl->asyncXactLSN;
SpinLockRelease(&XLogCtl->info_lck);
if (WriteRqst.Write <= LogwrtResult.Flush)
if (WriteRqst.Write <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
{
return false;
}
@ -4736,7 +4450,7 @@ He3DBXLogBackgroundFlush(void)
*/
now = GetCurrentTimestamp();
flushbytes = WriteRqst.Write - LogwrtResult.Flush;
flushbytes = WriteRqst.Write - (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush);
if (WalWriterFlushAfter == 0 || lastflush == 0)
{
@ -4784,7 +4498,7 @@ He3DBXLogBackgroundFlush(void)
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
if (WriteRqst.Write > LogwrtResult.Write ||
WriteRqst.Flush > LogwrtResult.Flush)
WriteRqst.Flush > (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
{
He3DBXLogFakeWrite(WriteRqst);
}
@ -4864,7 +4578,7 @@ bool XLogNeedsFlush(XLogRecPtr record)
}
/* Quick exit if already known flushed */
if (record <= LogwrtResult.Flush)
if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
return false;
/* read LogwrtResult and update local state */
@ -4873,7 +4587,7 @@ bool XLogNeedsFlush(XLogRecPtr record)
SpinLockRelease(&XLogCtl->info_lck);
/* check again */
if (record <= LogwrtResult.Flush)
if (record <= (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush))
return false;
return true;
@ -7382,8 +7096,8 @@ recoveryStopsBefore(XLogReaderState *record)
* Ignore recovery target settings when not in archive recovery (meaning
* we are in crash recovery).
*/
//if (!ArchiveRecoveryRequested)
// return false;
if (!ArchiveRecoveryRequested)
return false;
/* Check if we should stop as soon as reaching consistency */
if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE && reachedConsistency)
@ -7511,6 +7225,89 @@ recoveryStopsBefore(XLogReaderState *record)
return stopsHere;
}
static bool
he3recoveryStopsAfter(XLogReaderState *record)
{
bool stopsHere = false;
uint8 xact_info;
bool isCommit;
TimestampTz recordXtime = 0;
TransactionId recordXid;
/* Otherwise we only consider stopping before COMMIT or ABORT records. */
if (XLogRecGetRmid(record) != RM_XACT_ID)
return false;
xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
if (xact_info == XLOG_XACT_COMMIT)
{
isCommit = true;
recordXid = XLogRecGetXid(record);
}
else if (xact_info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit *xlrec = (xl_xact_commit *)XLogRecGetData(record);
xl_xact_parsed_commit parsed;
isCommit = true;
ParseCommitRecord(XLogRecGetInfo(record),
xlrec,
&parsed);
recordXid = parsed.twophase_xid;
}
else if (xact_info == XLOG_XACT_ABORT)
{
isCommit = false;
recordXid = XLogRecGetXid(record);
}
else if (xact_info == XLOG_XACT_ABORT_PREPARED)
{
xl_xact_abort *xlrec = (xl_xact_abort *)XLogRecGetData(record);
xl_xact_parsed_abort parsed;
isCommit = false;
ParseAbortRecord(XLogRecGetInfo(record),
xlrec,
&parsed);
recordXid = parsed.twophase_xid;
}
else
return false;
if (recoveryTarget == RECOVERY_TARGET_TIME &&
getRecordTimestamp(record, &recordXtime))
{
stopsHere = (recordXtime >= recoveryTargetTime);
}
if (stopsHere)
{
recoveryStopAfter = false;
recoveryStopXid = recordXid;
recoveryStopTime = recordXtime;
recoveryStopLSN = InvalidXLogRecPtr;
recoveryStopName[0] = '\0';
if (isCommit)
{
ereport(LOG,
(errmsg("recovery stopping before commit of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
else
{
ereport(LOG,
(errmsg("recovery stopping before abort of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
}
return stopsHere;
}
/*
* Same as recoveryStopsBefore, but called after applying the record.
*
@ -8204,6 +8001,18 @@ data_buffer_for_replay(XLogReaderState *record,XLogRecPtr startLsn,XLogRecPtr en
bool valid;
BufferDesc *buf;
/* See if the block is in the buffer pool already */
//for pg master he3db slave or backup restore
SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
smgrcreate(smgr, forknum, true);
BlockNumber blockNum = startupsmgrnblocks(smgr, forknum);
static char blkspace[BLCKSZ] = {0};
if (blockNum != P_NEW) {
for (int i = blockNum;i<=blkno;i++) {
smgrextend(smgr,forknum,i,blkspace,false);
}
} else {
elog(PANIC,"data_buffer_for_replay blockNum is P_NEW");
}
LWLockAcquire(partition_lock, LW_SHARED);
buf_id = BufTableLookup(&tag, hash);
/* If page is in buffer, we can apply record, otherwise we do nothing */
@ -8221,11 +8030,6 @@ data_buffer_for_replay(XLogReaderState *record,XLogRecPtr startLsn,XLogRecPtr en
}
updateLastReplayLsn(record);
ReleaseBuffer(buffer);
/*if (EnableHotStandby == true && push_standby == false) {
if (StartBufferIO(buf, true)) {
pageInMemoryFlushBufferToDisk(&tag);
}
}*/
} else {
updateLastReplayLsn(record);
LWLockRelease(partition_lock);
@ -8927,12 +8731,14 @@ void StartupXLOG(void)
if (backupFromStandby)
{
/*
if (dbstate_at_startup != DB_IN_ARCHIVE_RECOVERY &&
dbstate_at_startup != DB_SHUTDOWNED_IN_RECOVERY)
ereport(FATAL,
(errmsg("backup_label contains data inconsistent with control file"),
errhint("This means that the backup is corrupted and you will "
"have to use another backup for recovery.")));
*/
ControlFile->backupEndPoint = ControlFile->minRecoveryPoint;
}
}
@ -9147,10 +8953,12 @@ void StartupXLOG(void)
Insert->PrevBytePos = checkPoint.redo;
Insert->CurrBytePos = checkPoint.redo;
XLogCtl->InitializedUpTo = checkPoint.redo;
LogwrtResult.Write = LogwrtResult.Flush = checkPoint.redo;
LogwrtResult.Write = checkPoint.redo;
pg_atomic_write_u64(&LogwrtResult.Flush, checkPoint.redo);
XLogCtl->LogwrtResult = LogwrtResult;
XLogCtl->LogwrtRqst.Write = checkPoint.redo;
XLogCtl->LogwrtRqst.Flush = checkPoint.redo;
// XLogCtl->LogwrtRqst.Flush = checkPoint.redo;
pg_atomic_write_u64(&(XLogCtl->LogwrtRqst.Flush), checkPoint.redo);
XLogCtl->ThisTimeLineID = ThisTimeLineID;
//for checkpoint insert into tikv
XLogBeginRead(xlogreader, RecPtr);
@ -9461,6 +9269,15 @@ void StartupXLOG(void)
}
g_redoStartLsn[2] = record->xl_end;
g_redoStartLsn[1] = record->xl_end - record->xl_tot_len;
if (he3_point_in_time_recovery && he3recoveryStopsAfter(xlogreader))
{
DelRangeWals(1, xlogreader->EndRecPtr, 18446744073709551615);
pushTikv(0,hashMapSize(),true);
while (ReadRecord(xlogreader, LOG, false) != NULL) {
}
break;
}
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, LOG, false);
} while (record != NULL);
@ -9795,12 +9612,13 @@ void StartupXLOG(void)
XLogCtl->InitializedUpTo = EndOfLog;
// }
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
LogwrtResult.Write = EndOfLog;
pg_atomic_write_u64(&LogwrtResult.Flush, EndOfLog);
XLogCtl->LogwrtResult = LogwrtResult;
XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
pg_atomic_write_u64(&(XLogCtl->LogwrtRqst.Flush), EndOfLog);
LocalSetXLogInsertAllowed();
@ -10087,7 +9905,8 @@ CheckRecoveryConsistency(void)
if (XLogRecPtrIsInvalid(minRecoveryPoint) && !he3_point_in_time_recovery)
return;
Assert(InArchiveRecovery);
if (!he3_point_in_time_recovery)
Assert(InArchiveRecovery);
/*
* assume that we are called in the startup process, and hence don't need
@ -10549,8 +10368,7 @@ GetFlushRecPtr(void)
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
return LogwrtResult.Flush;
return (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush);
}
/*
@ -13005,6 +12823,8 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
recptr = XLogCtl->lastFpwDisableRecPtr;
SpinLockRelease(&XLogCtl->info_lck);
/*
*for he3db pg_basebackup
if (!checkpointfpw || startpoint <= recptr)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@ -13014,6 +12834,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
"is corrupt and should not be used. "
"Enable full_page_writes and run CHECKPOINT on the primary, "
"and then try an online backup again.")));
*/
/*
* During recovery, since we don't use the end-of-backup WAL
@ -13589,6 +13410,8 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
recptr = XLogCtl->lastFpwDisableRecPtr;
SpinLockRelease(&XLogCtl->info_lck);
/*
*for he3db pg_basebackup
if (startpoint <= recptr)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@ -13598,6 +13421,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
"is corrupt and should not be used. "
"Enable full_page_writes and run CHECKPOINT on the primary, "
"and then try an online backup again.")));
*/
LWLockAcquire(ControlFileLock, LW_SHARED);
stoppoint = ControlFile->minRecoveryPoint;

View File

@ -883,6 +883,8 @@ XLogRecData *DecodeXLogRecordAssemble(XLogReaderState *state, OldXLogRecord *rec
scratch += SizeOfXLogRecord;
if (state->max_block_id >= 0) {
DecodedBkpBlock *blkbuf = &state->blocks[block_id];
if (!blkbuf->in_use)
continue;
XLogRecData* bkp_rdatas = g_bkp_rdatas[block_id];
XLogRecordBlockHeader bkpb;

View File

@ -336,7 +336,10 @@ XLogRecord *
StartupXLogReadRecord(XLogReaderState *state, char **errormsg)
{
if (grouo_rec_cur_count < grouo_rec_count) {
state->blocks[0] = state->blocks[grouo_rec_cur_count];
uint8_t blkidx = grouphead[grouo_rec_cur_count]->blocknum;
DecodedBkpBlock blk = state->blocks[0];
state->blocks[0] = state->blocks[blkidx];
state->blocks[blkidx] = blk;
state->decoded_record = grouphead[grouo_rec_cur_count];
return grouphead[grouo_rec_cur_count++];
}
@ -682,6 +685,10 @@ StartupXLogReadRecord(XLogReaderState *state, char **errormsg)
if (state->max_block_id > 0) {
state->max_block_id = 0;
}
uint8_t blkidx = grouphead[grouo_rec_cur_count]->blocknum;
DecodedBkpBlock blk = state->blocks[0];
state->blocks[0] = state->blocks[blkidx];
state->blocks[blkidx] = blk;
state->decoded_record = grouphead[grouo_rec_cur_count];
return grouphead[grouo_rec_cur_count++];
} else
@ -1997,7 +2004,8 @@ DecodeOldXLogRecord(XLogReaderState *state, OldXLogRecord *record, char **errorm
uint32 datatotal;
RelFileNode *rnode = NULL;
uint8 block_id;
static int lastMaxIdx = XLR_MAX_BLOCK_ID;
state->max_block_id = lastMaxIdx;
ResetDecoder(state);
state->decoded_record = NULL;
@ -2011,7 +2019,7 @@ DecodeOldXLogRecord(XLogReaderState *state, OldXLogRecord *record, char **errorm
return false;
}
remaining = record->xl_tot_len - SizeOfOldXLogRecord;
/* Decode the headers */
datatotal = 0;
while (remaining > datatotal)
@ -2215,7 +2223,7 @@ DecodeOldXLogRecord(XLogReaderState *state, OldXLogRecord *record, char **errorm
* the data for the convenience of the callers. Backup images are not
* copied, however; they don't need alignment.
*/
lastMaxIdx = state->max_block_id;
/* block data first */
for (block_id = 0; block_id <= state->max_block_id; block_id++)
{

View File

@ -1067,9 +1067,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
if (isExtend)
{
/* new buffers are zero-filled */
MemSet((char *) bufBlock, 0, BLCKSZ);
/* don't set checksum for all-zero page */
smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
/* don't set checksum for all-zero page */
/* for new page precache */
if (*preCacheNodesCountPtr > 0)
@ -1085,42 +1087,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
preCacheNodei++;
}
}
//parallel replay PageFlushWorkerMain=>ProcFlushBufferToDisk=>XLogReadBufferExtended=>default status RM_NORMAL,
//where init page,status is RBM_ZERO_AND_LOCK will lead to page invaild,so need smgrextend page then to smgrread
//push standby can use ReadWalsByPage to replay base RBM_ZERO page,but slave must be ensure flush page min LSN point
// bigger than push standby provide Point of consistency,if not slave no to replay ,wal list maybe cut to lead to replay
// data wrong
if (!((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
!isLocalBuf) && IsBootstrapProcessingMode() != true && InitdbSingle != true)
{
if (EnableHotStandby == true || InRecovery) {
BufferTag pageTag;
pageTag.rnode = smgr->smgr_rnode.node;
pageTag.forkNum = forkNum;
pageTag.blockNum = blockNum;
replayLsn = GetXLogReplayRecPtr(&tli);
XLogRecPtr pageLsn = BufferGetLSN(bufHdr);
head = GetLogIndexByPage(&pageTag,pageLsn,replayLsn);
if ((EnableHotStandby == true && push_standby == false) || he3mirror) {
if (head->next!=NULL) {
tWalRecord = ReadWalsByPage(pageTag.rnode.dbNode,pageTag.rnode.relNode,forkNum,blockNum,tli,head);
}
} else {
LsnNode* next = head->next;
if (next!=NULL) {
walRecord.cap = 8192;
walRecord.buf = malloc(walRecord.cap);
}
while(next!=NULL) {
int count = walRecordQuery(&walRecord.buf,&walRecord.count,&walRecord.cap,next->lsn);
if (count == -1) {
elog(FATAL,"======walRecordQuery query wal Faild %X/%X===1===",LSN_FORMAT_ARGS(next->lsn));
}
next = next->next;
}
}
}
}
/*
* NB: we're *not* doing a ScheduleBufferTagForWriteback here;
@ -1281,7 +1247,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
{
XLogRecPtr pageLsn = BufferGetLSN(bufHdr);
char *xlogStart = NULL;
if (pageXlogPtr != NULL) {
if (pageXlogPtr != NULL && !he3mirror) {
xlogStart = pageXlogPtr + BLCKSZ;
nbytes = nbytes - BLCKSZ;
if (walRecord.count != 0) {

View File

@ -642,6 +642,28 @@ smgrnblocks(SMgrRelation reln, ForkNumber forknum)
return result;
}
/*
* smgrnblocks() -- Calculate the number of blocks in the
* supplied relation.
*/
BlockNumber
startupsmgrnblocks(SMgrRelation reln, ForkNumber forknum)
{
BlockNumber result;
/* Check and return if we get the cached value for the number of blocks. */
result = smgrnblocks_cached(reln, forknum);
if (result != InvalidBlockNumber)
return result;
result = smgrsw[reln->smgr_which].smgr_nblocks(reln, forknum);
reln->smgr_cached_nblocks[forknum] = result;
return result;
}
/*
* smgrnblocks_cached() -- Get the cached number of blocks in the supplied
* relation.

View File

@ -221,20 +221,23 @@ SyncPostCheckpoint(void)
break;
/* Unlink the file */
if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
path) < 0)
if (push_standby)
{
/*
* There's a race condition, when the database is dropped at the
* same time that we process the pending unlink requests. If the
* DROP DATABASE deletes the file before we do, we will get ENOENT
* here. rmtree() also has to ignore ENOENT errors, to deal with
* the possibility that we delete the file first.
*/
if (errno != ENOENT)
ereport(WARNING,
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m", path)));
if (syncsw[entry->tag.handler].sync_unlinkfiletag(&entry->tag,
path) < 0)
{
/*
* There's a race condition, when the database is dropped at the
* same time that we process the pending unlink requests. If the
* DROP DATABASE deletes the file before we do, we will get ENOENT
* here. rmtree() also has to ignore ENOENT errors, to deal with
* the possibility that we delete the file first.
*/
if (errno != ENOENT)
ereport(WARNING,
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m", path)));
}
}
/* Mark the list entry as canceled, just in case */

View File

@ -101,6 +101,7 @@ extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,
extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, BlockNumber nblocks);
extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
extern BlockNumber startupsmgrnblocks(SMgrRelation reln, ForkNumber forknum);
extern BlockNumber smgrnblocks_cached(SMgrRelation reln, ForkNumber forknum);
extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum,
int nforks, BlockNumber *nblocks);