truncate wait pushstandby push to xlog checkpoint

Code Source From: Self Code
Description:  【Optional】
Jira:  #【Optional】
市场项目编号(名称):【Optional】
This commit is contained in:
shenzhengntu 2023-06-14 20:54:13 +08:00
parent 3f5feb7c81
commit 65fe12841c
3 changed files with 35 additions and 12 deletions

View File

@ -195,8 +195,15 @@ XLogRecPtr QueryMinLsn(XLogRecPtr lsn)
return 1;
}
}
//no slave,pushstandby no need wait
}
else if (PQresultStatus(pgres) != PGRES_COMMAND_OK)
else if (PQresultStatus(pgres) == PGRES_FATAL_ERROR)
{
//master crash,pushstandby need replay to master crash point for private
PQclear(pgres);
return InvalidXLogRecPtr;
}
else
{
PQfinish(pushconn);
pushconn = NULL;

View File

@ -14534,13 +14534,15 @@ static bool GetShutDownStatus(void) {
return false;
}
static int
static int consumerFailedNum = 0;
static XLogRecPtr reStartPtr = 0;
consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqLen,
char *readBuf)
{
wal_batch_t *FirstData = NULL;
int timeCount = 0;
bool pushFlag = false;
reStartPtr = startPtr;
do {
if (gRingBufferManger->maxIdx > spaceNum/16 || timeCount > 1000 || pushFlag == true) {
if (push_standby == true || EnableHotStandby == false) {
@ -14550,6 +14552,7 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL
ring_buffer_dequeue_arr(gRingBufferManger,gRingBufferManger->maxIdx);
gRingBufferManger->maxIdx = 0;
if (pushFlag == true) {
consumerFailedNum++;
return -1;
}
}
@ -14561,6 +14564,7 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL
xlogreader->readLen = FirstData->dataLen;
gRingBufferManger->maxIdx++;
FileCheckPointPtr = FirstData->checkPointLsn;
consumerFailedNum = 0;
return FirstData->dataLen;
} else if (startPtr > FirstData->startLsn) {
if (FirstData->startLsn == 0) {
@ -14581,6 +14585,9 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL
elog(FATAL,"stream startPtr %X gt FirstData.startLsn %X",FirstData->startLsn,startPtr);
return -1;
}
} else {
pg_usleep(1000);
continue;
}
} else {
timeCount++;
@ -14705,6 +14712,8 @@ producerXLogParallelBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr,
if (readedUpto == 0) {
readedUpto = startPtr;
}
//failed to connect master more than 8
int failedCount = 0;
while(GetShutDownStatus() == false) {
if (!WaitForWALToBecomeAvailable(readedUpto + reqLen,
private->randAccess,
@ -14717,13 +14726,22 @@ producerXLogParallelBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr,
XLogRecPtr maxFlushedUpto = 0;
//storage batch is 16k batch,we has 8 of pthreads
if (!EnableHotStandby || LocalPromoteIsTriggered) {
if (!EnableHotStandby || LocalPromoteIsTriggered || (failedCount > 8 && consumerFailedNum < 8)) {
maxFlushedUpto = readedUpto + 8 * 16 * 1024;
} else {
if (readedUpto < flushedUpto) {
maxFlushedUpto = flushedUpto;
} else if (flushedUpto == 0 || !WalRcvStreaming()) {
lastSourceFailed = true;
//reConnect to master
if (ring_buffer_is_empty(gRingBufferManger) && consumerFailedNum >= 8) {
if (!XLogRecPtrIsInvalid(reStartPtr)) {
readedUpto = reStartPtr;
consumerFailedNum = 0;
}
} else {
if (readedUpto < flushedUpto) {
maxFlushedUpto = flushedUpto;
} else if (flushedUpto == 0 || !WalRcvStreaming()) {
failedCount++;
lastSourceFailed = true;
}
}
}
@ -14745,7 +14763,7 @@ producerXLogParallelBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr,
walElem.endLsn = end;
wal_batch_t *curElem = NULL;
while((curElem = ring_buffer_queue(gRingBufferManger,walElem)) == NULL) {
usleep(10);
pg_usleep(100000);
if (IsFreePthreadPool()){
return 0;
}
@ -14755,7 +14773,7 @@ producerXLogParallelBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr,
if (readedUpto < end) {
readedUpto = end;
} else {
usleep(10);
pg_usleep(100000);
}
}
return 0;

View File

@ -1120,9 +1120,7 @@ restart:
if (!XLogRecPtrIsInvalid(CheckPointPtr)) {
pushTikv(0,hashMapSize(),true);
PushCheckPointGuts(CheckPointPtr,GlobalState);
if (push_standby == true) {
FlushNewRecoveryPoint(CheckPointPtr);
}
FlushNewRecoveryPoint(CheckPointPtr);
printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr);
CheckPointPtr = InvalidXLogRecPtr;
}