From 37ec5b24b44855c0bd6621f125a89b143ba7693a Mon Sep 17 00:00:00 2001 From: shenzhengntu <503699317@qq.com> Date: Sun, 26 Mar 2023 19:46:41 +0800 Subject: [PATCH] slave replay and master recovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Code Source From: Self Code Description: [Optional] Jira: #[Optional] 市场项目编号(名称):[Optional] --- src/backend/access/transam/pagehashqueue.c | 17 +++++++++++-- src/backend/access/transam/pthreadpool.c | 1 + src/backend/access/transam/ringbuffer.c | 24 ++++++++++-------- src/backend/access/transam/xlog.c | 29 ++++++++++------------ src/backend/access/transam/xlogreader.c | 4 +-- src/backend/postmaster/startup.c | 4 ++- src/backend/storage/buffer/bufmgr.c | 13 +--------- src/backend/storage/lmgr/he3db_logindex.c | 2 +- src/backend/utils/init/miscinit.c | 3 --- src/include/access/pagehashqueue.h | 1 + src/include/access/ringbuffer.h | 1 + src/include/access/xlog.h | 1 + src/include/postmaster/startup.h | 1 - 13 files changed, 52 insertions(+), 49 deletions(-) diff --git a/src/backend/access/transam/pagehashqueue.c b/src/backend/access/transam/pagehashqueue.c index 8249225..e379453 100644 --- a/src/backend/access/transam/pagehashqueue.c +++ b/src/backend/access/transam/pagehashqueue.c @@ -19,7 +19,9 @@ #include "storage/proc.h" #include "access/pushpage.h" #include "storage/buf_internals.h" - +#include "utils/guc.h" +#include "storage/he3db_logindex.h" +static void WakeupFlushWork(void); XLogRecPtr *g_redoStartLsn; static HTAB *PageLogindexHash = NULL; static int MaxNum(int num) { @@ -394,7 +396,7 @@ CleanLogIndexMain(int argc, char *argv[]) int hasData = 0; pushStandbyPrePoint = pushStandbyPoint; - if (push_standby == true) { + if (push_standby == true||EnableHotStandby == false) { pushStandbyPoint = GetXLogPushToDisk(); if (pushStandbyPrePoint < pushStandbyPoint) { hasData++; @@ -501,6 +503,17 @@ typedef struct { }PageHashQueueShmemStruct; static PageHashQueueShmemStruct *PageHashQueueShmem; +void pushSlaveReplayQueue(int pageNum) { + if (pg_atomic_read_u32(&PageHashQueueShmem->gpos) != 0 && pg_atomic_read_u32(&PageHashQueueShmem->ready) == 0) { + pg_atomic_exchange_u32(&PageHashQueueShmem->ready,1); + WakeupFlushWork(); + } + while(pageNum > CompletedTaskNum()) { + usleep(10); + } + cleanMap(); +} + Latch* GetCurrentLatch(uint32_t pos) { return &PageHashQueueShmem->pageFlushWakeupLatch[pos]; } diff --git a/src/backend/access/transam/pthreadpool.c b/src/backend/access/transam/pthreadpool.c index 507f85d..9796199 100644 --- a/src/backend/access/transam/pthreadpool.c +++ b/src/backend/access/transam/pthreadpool.c @@ -4,6 +4,7 @@ #include "access/xlog.h" #include "access/xlogrecord.h" #include +#include "utils/guc.h" GThreadPool *gpool = NULL; static __thread GError *gerr = NULL; static bool IsInitPool = false; diff --git a/src/backend/access/transam/ringbuffer.c b/src/backend/access/transam/ringbuffer.c index 87d51a2..81f1c89 100644 --- a/src/backend/access/transam/ringbuffer.c +++ b/src/backend/access/transam/ringbuffer.c @@ -1,6 +1,6 @@ #include "access/ringbuffer.h" #include - +#include "access/xlogrecord.h" /** * @file * Implementation of ring buffer functions. @@ -118,23 +118,25 @@ void InitRingBufferSpace(void) { for(;imaxIdx == 0) { return -1; } - int low = 0,high = 0, mid = 0; - if (gManageFreeList[gRingBufferManger->maxIdx-1].startLsn == 0) { - high = gRingBufferManger->maxIdx-2; + int low = 0,high = gRingBufferManger->maxIdx, mid = 0; + if (gRingBufferManger->buffer[high-1].startLsn == 0) { + high -= 2; + } else { + high -=1; } bool find = false; while(low <= high) { mid = (low + high) / 2; - if (gManageFreeList[mid].startLsn > lsn) { + if (gRingBufferManger->buffer[mid].startLsn > lsn) { high = mid - 1; - } else if ((gManageFreeList[mid].startLsn < lsn) { + } else if (gRingBufferManger->buffer[mid].startLsn < lsn) { low = mid + 1; } else { find = true; @@ -145,14 +147,14 @@ int walRecordQuery(char**buffer,int* curpos,int* maxspace,uint64 lsn) { int xllen = -1; bool extandFlag = false; if (find == true) { - record = (XLogRecord*)gManageFreeList[mid].data; + record = (XLogRecord*)gRingBufferManger->buffer[mid].data; xllen = record->xl_tot_len; } else { - record = (XLogRecord*)gManageFreeList[high].data; - if (gManageFreeList[mid].startLsn + gManageFreeList[mid].dataLen >= lsn) { + record = (XLogRecord*)gRingBufferManger->buffer[high].data; + if (gRingBufferManger->buffer[mid].startLsn + gRingBufferManger->buffer[mid].dataLen >= lsn) { return -1; } else { - record = (XLogRecord*)(gManageFreeList[high].data + lsn); + record = (XLogRecord*)(gRingBufferManger->buffer[high].data + (lsn-gRingBufferManger->buffer[high].startLsn)); xllen = record->xl_tot_len; } } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8311bc5..d57b065 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8047,17 +8047,6 @@ he3db_xlog_donot_to_replay(XLogRecord *record) return true; } -void pushSlaveReplayQueue(int pageNum) { - if (pg_atomic_read_u32(&PageHashQueueShmem->gpos) != 0 && pg_atomic_read_u32(&PageHashQueueShmem->ready) == 0) { - pg_atomic_exchange_u32(&PageHashQueueShmem->ready,1); - WakeupFlushWork(); - } - while(pageNum > CompletedTaskNum()) { - usleep(10); - } - cleanMap(); -} - void pushTikv(int onePageListLen,int pageNum,bool flag) { if (push_standby == true) { @@ -8149,7 +8138,7 @@ data_buffer_for_replay(XLogReaderState *record,XLogRecPtr startLsn,XLogRecPtr en updateLastReplayLsn(); LWLockRelease(partition_lock); } - if (push_standby == true) { + if (push_standby == true || EnableHotStandby == false) { //tbspace now donot care for test uint32_t count = addFileKey(&tag); pushTikv(count,hashMapSize(),false); @@ -9287,7 +9276,7 @@ void StartupXLOG(void) //initdb no need to replay #ifndef PG_NOREPLAY if(IsBootstrapProcessingMode() != true && InitdbSingle!=true) { - if (push_standby == true) { + if (push_standby == true || EnableHotStandby == false) { pushTikv(0, hashMapSize(),false); } } @@ -14354,6 +14343,14 @@ next_record_is_invalid: else return -1; } + +bool startup_shutdown_requested = false; +static bool GetShutDownStatus(void) { + if (startup_shutdown_requested) { + return true; + } + return false; +} static int consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqLen, @@ -14365,7 +14362,7 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL bool pushFlag = false; do { if (gRingBufferManger->maxIdx > spaceNum/16 || timeCount > 100000 || pushFlag == true) { - if (push_standby == true) { + if (push_standby == true||EnableHotStandby == false) { pushTikv(0, hashMapSize(), true); } if (gRingBufferManger->maxIdx != 0 ) { @@ -14403,7 +14400,7 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL usleep(10); } count = 0; - } while(shutdown_requested == false && (FirstData == NULL || startPtr > FirstData->startLsn)); + } while(GetShutDownStatus() == false && (FirstData == NULL || startPtr > FirstData->startLsn)); return -1; } @@ -14416,7 +14413,7 @@ producerXLogParallelBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, if (readedUpto == 0) { readedUpto = startPtr; } - while(shutdown_requested == false) { + while(GetShutDownStatus() == false) { if (!WaitForWALToBecomeAvailable(readedUpto + reqLen, private->randAccess, private->fetching_ckpt, diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 413b488..b421cb9 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -362,7 +362,7 @@ restart: #ifndef PG_NOREPLAY if (IsBootstrapProcessingMode() != true && InitdbSingle!=true) { if (getpid() == startupPid) { - if (push_standby == true) { + if (push_standby == true || EnableHotStandby == false ) { /* Update shared-memory status */ XLogRecPtr prevPushPoint = PrePushPtr; if (!XLogRecPtrIsInvalid(CheckPointPtr)) { @@ -722,7 +722,7 @@ restart: #ifndef PG_NOREPLAY if (IsBootstrapProcessingMode() != true && InitdbSingle!=true) { if (getpid() == startupPid) { - if (push_standby == true) { + if (push_standby == true||EnableHotStandby == false) { // Update shared-memory status XLogRecPtr prevPushPoint = PrePushPtr; if (!XLogRecPtrIsInvalid(CheckPointPtr)) { diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c index 2983989..b9d2e45 100644 --- a/src/backend/postmaster/startup.c +++ b/src/backend/postmaster/startup.c @@ -103,8 +103,10 @@ StartupProcShutdownHandler(SIGNAL_ARGS) if (in_restore_command) proc_exit(1); - else + else { shutdown_requested = true; + startup_shutdown_requested = true; + } WakeupRecovery(); errno = save_errno; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index fe339be..83c2406 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -493,7 +493,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, BufferAccessStrategy strategy, - bool *foundPtr); + bool *foundPtr,bool *exist); static BufferDesc *He3DBBufferAlloc_replay(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, @@ -857,12 +857,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, bool isExtend; bool isLocalBuf = SmgrIsTemp(smgr); /* he3db: local tem buffer for pageXlog */ - char *pageXlogBuf; - int nbytes; - *hit = false; - pageXlogBuf = NULL; - /* Make sure we will have room to remember the buffer pin */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); @@ -1166,12 +1161,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, blockNum, relpath(smgr->smgr_rnode, forkNum)))); MemSet((char *) bufBlock, 0, BLCKSZ); - - if(pageXlogBuf != NULL) - { - free(pageXlogBuf); - pageXlogBuf = NULL; - } } else ereport(ERROR, diff --git a/src/backend/storage/lmgr/he3db_logindex.c b/src/backend/storage/lmgr/he3db_logindex.c index dace4e2..2243c66 100644 --- a/src/backend/storage/lmgr/he3db_logindex.c +++ b/src/backend/storage/lmgr/he3db_logindex.c @@ -322,7 +322,7 @@ void He3dbLogIndexTblListInit(void) log_index_mem_list->table_start_index = 0; log_index_mem_list->active_table_index = 0; log_index_mem_list->table_cap = logindex_mem_tbl_size; - SpinLockInit(&(log_index_mem_list->lock)); + //SpinLockInit(&(log_index_mem_list->lock)); for (uint64 i = 0; i < log_index_mem_list->table_cap; i++) { // set mem table init values SpinLockInit(&(log_index_mem_list->mem_table[i].meta.meta_lock)); diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index e565dae..0f957c6 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -279,15 +279,12 @@ GetBackendTypeDesc(BackendType backendType) case B_PARALLEL_FLUSH: backendDesc = "parallel flush"; break; -<<<<<<< HEAD case B_CLEAN_LOGINDEX: backendDesc = "clean logindex"; break; /*case B_RELATIONCLEANUP: backendDesc = "relation cleanup"; break;*/ -======= ->>>>>>> 859cb43019a91590e91ebe381861f9fe3f6b94a2 } return backendDesc; diff --git a/src/include/access/pagehashqueue.h b/src/include/access/pagehashqueue.h index cceaa5e..c619cee 100644 --- a/src/include/access/pagehashqueue.h +++ b/src/include/access/pagehashqueue.h @@ -39,6 +39,7 @@ extern Size PageHashMapSize(void); extern void PageHashQueueShmemInit(void); Size LogindexHashAllShmemSize(void); void InitLogindexHashBrucket(void); +void pushSlaveReplayQueue(int pageNum); extern XLogRecPtr *g_redoStartLsn; #endif diff --git a/src/include/access/ringbuffer.h b/src/include/access/ringbuffer.h index f08a18a..6d42122 100644 --- a/src/include/access/ringbuffer.h +++ b/src/include/access/ringbuffer.h @@ -64,6 +64,7 @@ typedef struct wal_batch_t { } wal_batch_t; extern ring_buffer_t* gRingBufferManger; +extern const int spaceNum; /** * Structure which holds a ring buffer. * The buffer contains a buffer array diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 96c84f3..98a0a06 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -31,6 +31,7 @@ extern int sync_method; extern PGDLLIMPORT TimeLineID ThisTimeLineID; /* current TLI */ +extern bool startup_shutdown_requested; /* * Prior to 8.4, all activity during recovery was carried out by the startup diff --git a/src/include/postmaster/startup.h b/src/include/postmaster/startup.h index bf6adf1..03f41b3 100644 --- a/src/include/postmaster/startup.h +++ b/src/include/postmaster/startup.h @@ -18,5 +18,4 @@ extern void PreRestoreCommand(void); extern void PostRestoreCommand(void); extern bool IsPromoteSignaled(void); extern void ResetPromoteSignaled(void); - #endif /* _STARTUP_H */