fix: he3db as salve restart failed

This commit is contained in:
wangyao 2023-06-02 06:44:23 +00:00 committed by Gitee
commit c07a41db41
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
3 changed files with 26 additions and 10 deletions

View File

@ -52,7 +52,7 @@ int initPthreadPool(void) {
} }
//default 8 thread read //default 8 thread read
if(he3mirror){ if(he3mirror){
gpool = g_thread_pool_new(produceWalFunc,NULL,1,FALSE,NULL); gpool = g_thread_pool_new(produceWalFunc,NULL,4,FALSE,NULL);
}else{ }else{
gpool = g_thread_pool_new(getWalFunc,NULL,8,FALSE,NULL); gpool = g_thread_pool_new(getWalFunc,NULL,8,FALSE,NULL);
} }
@ -75,7 +75,12 @@ void WalTaskFree(void) {
} }
void WalTaskImmediateFree(void) { void WalTaskImmediateFree(void) {
return g_thread_pool_free(gpool,TRUE,TRUE); g_thread_pool_free(gpool,TRUE,TRUE);
gpool = NULL;
}
bool IsFreePthreadPool(void) {
return gpool == NULL;
} }

View File

@ -8321,6 +8321,7 @@ void StartupXLOG(void)
), ),
&private); &private);
tmpxlogreader = he3xlogreader; tmpxlogreader = he3xlogreader;
he3xlogreader->system_identifier = ControlFile->system_identifier;
} else { } else {
xlogreader = xlogreader =
XLogReaderAllocate(wal_segment_size, NULL, XLogReaderAllocate(wal_segment_size, NULL,
@ -8335,7 +8336,6 @@ void StartupXLOG(void)
errmsg("out of memory"), errmsg("out of memory"),
errdetail("Failed while allocating a WAL reading processor."))); errdetail("Failed while allocating a WAL reading processor.")));
xlogreader->system_identifier = ControlFile->system_identifier; xlogreader->system_identifier = ControlFile->system_identifier;
he3xlogreader->system_identifier = ControlFile->system_identifier;
/* /*
* Allocate two page buffers dedicated to WAL consistency checks. We do * Allocate two page buffers dedicated to WAL consistency checks. We do
* it this way, rather than just making static arrays, for two reasons: * it this way, rather than just making static arrays, for two reasons:
@ -8755,7 +8755,7 @@ void StartupXLOG(void)
*/ */
abortedRecPtr = InvalidXLogRecPtr; abortedRecPtr = InvalidXLogRecPtr;
missingContrecPtr = InvalidXLogRecPtr; missingContrecPtr = InvalidXLogRecPtr;
pthread_t ntid; pthread_t ntid = 0;
/* REDO */ /* REDO */
if (InRecovery) if (InRecovery)
{ {
@ -9108,6 +9108,8 @@ void StartupXLOG(void)
if(IsBootstrapProcessingMode() != true && InitdbSingle != true) { if(IsBootstrapProcessingMode() != true && InitdbSingle != true) {
*g_redoStartLsn = xlogreader->EndRecPtr; *g_redoStartLsn = xlogreader->EndRecPtr;
if (he3mirror) { if (he3mirror) {
XLogPageReadPrivate *private = he3xlogreader->private_data;
private->fetching_ckpt = false;
err = pthread_create(&ntid,NULL,thr_fn,(void*)he3xlogreader); err = pthread_create(&ntid,NULL,thr_fn,(void*)he3xlogreader);
} else { } else {
err = pthread_create(&ntid,NULL,thr_fn,(void*)xlogreader); err = pthread_create(&ntid,NULL,thr_fn,(void*)xlogreader);
@ -9129,6 +9131,8 @@ void StartupXLOG(void)
if(IsBootstrapProcessingMode() != true && InitdbSingle != true) { if(IsBootstrapProcessingMode() != true && InitdbSingle != true) {
*g_redoStartLsn = xlogreader->EndRecPtr; *g_redoStartLsn = xlogreader->EndRecPtr;
if (he3mirror) { if (he3mirror) {
XLogPageReadPrivate *private = he3xlogreader->private_data;
private->fetching_ckpt = false;
err = pthread_create(&ntid,NULL,thr_fn,(void*)he3xlogreader); err = pthread_create(&ntid,NULL,thr_fn,(void*)he3xlogreader);
} else { } else {
err = pthread_create(&ntid,NULL,thr_fn,(void*)xlogreader); err = pthread_create(&ntid,NULL,thr_fn,(void*)xlogreader);
@ -14511,9 +14515,10 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL
if (gRingBufferManger->maxIdx != 0 ) { if (gRingBufferManger->maxIdx != 0 ) {
ring_buffer_dequeue_arr(gRingBufferManger,gRingBufferManger->maxIdx); ring_buffer_dequeue_arr(gRingBufferManger,gRingBufferManger->maxIdx);
gRingBufferManger->maxIdx = 0; gRingBufferManger->maxIdx = 0;
if (pushFlag == true) { if (pushFlag == true && EnableHotStandby) {
//pushFlag = false;
return -1; return -1;
} else {
pushFlag = false;
} }
} }
} }
@ -14528,10 +14533,12 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL
return FirstData->dataLen; return FirstData->dataLen;
} else if (startPtr > FirstData->startLsn) { } else if (startPtr > FirstData->startLsn) {
if (FirstData->startLsn == 0) { if (FirstData->startLsn == 0) {
if (FirstData->checkPointLsn == 0) { if (!he3mirror) {
count++; count++;
} else { } else {
if (FirstData->checkPointLsn != 0){
SetFileReplayLsn(FirstData->checkPointLsn); SetFileReplayLsn(FirstData->checkPointLsn);
}
} }
gRingBufferManger->maxIdx++; gRingBufferManger->maxIdx++;
pushFlag = true; pushFlag = true;
@ -14712,6 +14719,9 @@ producerXLogParallelBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr,
wal_batch_t *curElem = NULL; wal_batch_t *curElem = NULL;
while((curElem = ring_buffer_queue(gRingBufferManger,walElem)) == NULL) { while((curElem = ring_buffer_queue(gRingBufferManger,walElem)) == NULL) {
usleep(10); usleep(10);
if (IsFreePthreadPool()){
return 0;
}
} }
WalTaskPool(curElem); WalTaskPool(curElem);
} }

View File

@ -5,6 +5,7 @@ extern int initPthreadPool(void);
extern int WalTaskPool(wal_batch_t*data); extern int WalTaskPool(wal_batch_t*data);
extern void WalTaskFree(void); extern void WalTaskFree(void);
extern void WalTaskImmediateFree(void); extern void WalTaskImmediateFree(void);
extern bool IsFreePthreadPool(void);
#endif #endif