slave replay and master recovery

Code Source From: Self Code
Description:  [Optional]
Jira:  #[Optional]
市场项目编号(名称):[Optional]
This commit is contained in:
shenzhengntu 2023-03-26 19:46:41 +08:00
parent 55df57f91e
commit 37ec5b24b4
13 changed files with 52 additions and 49 deletions

View File

@ -19,7 +19,9 @@
#include "storage/proc.h"
#include "access/pushpage.h"
#include "storage/buf_internals.h"
#include "utils/guc.h"
#include "storage/he3db_logindex.h"
static void WakeupFlushWork(void);
XLogRecPtr *g_redoStartLsn;
static HTAB *PageLogindexHash = NULL;
static int MaxNum(int num) {
@ -394,7 +396,7 @@ CleanLogIndexMain(int argc, char *argv[])
int hasData = 0;
pushStandbyPrePoint = pushStandbyPoint;
if (push_standby == true) {
if (push_standby == true||EnableHotStandby == false) {
pushStandbyPoint = GetXLogPushToDisk();
if (pushStandbyPrePoint < pushStandbyPoint) {
hasData++;
@ -501,6 +503,17 @@ typedef struct {
}PageHashQueueShmemStruct;
static PageHashQueueShmemStruct *PageHashQueueShmem;
void pushSlaveReplayQueue(int pageNum) {
if (pg_atomic_read_u32(&PageHashQueueShmem->gpos) != 0 && pg_atomic_read_u32(&PageHashQueueShmem->ready) == 0) {
pg_atomic_exchange_u32(&PageHashQueueShmem->ready,1);
WakeupFlushWork();
}
while(pageNum > CompletedTaskNum()) {
usleep(10);
}
cleanMap();
}
Latch* GetCurrentLatch(uint32_t pos) {
return &PageHashQueueShmem->pageFlushWakeupLatch[pos];
}

View File

@ -4,6 +4,7 @@
#include "access/xlog.h"
#include "access/xlogrecord.h"
#include <glib.h>
#include "utils/guc.h"
GThreadPool *gpool = NULL;
static __thread GError *gerr = NULL;
static bool IsInitPool = false;

View File

@ -1,6 +1,6 @@
#include "access/ringbuffer.h"
#include <string.h>
#include "access/xlogrecord.h"
/**
* @file
* Implementation of ring buffer functions.
@ -118,23 +118,25 @@ void InitRingBufferSpace(void) {
for(;i<spaceNum; i++) {
gManageFreeList[i].data = &gFreeSpace[i*4*XLOG_BLCKSZ];
}
ring_buffer_init(&gRingBufferManger,gManageFreeList,spaceNum);
ring_buffer_init(gRingBufferManger,gManageFreeList,spaceNum);
}
int walRecordQuery(char**buffer,int* curpos,int* maxspace,uint64 lsn) {
if (gRingBufferManger->maxIdx == 0) {
return -1;
}
int low = 0,high = 0, mid = 0;
if (gManageFreeList[gRingBufferManger->maxIdx-1].startLsn == 0) {
high = gRingBufferManger->maxIdx-2;
int low = 0,high = gRingBufferManger->maxIdx, mid = 0;
if (gRingBufferManger->buffer[high-1].startLsn == 0) {
high -= 2;
} else {
high -=1;
}
bool find = false;
while(low <= high) {
mid = (low + high) / 2;
if (gManageFreeList[mid].startLsn > lsn) {
if (gRingBufferManger->buffer[mid].startLsn > lsn) {
high = mid - 1;
} else if ((gManageFreeList[mid].startLsn < lsn) {
} else if (gRingBufferManger->buffer[mid].startLsn < lsn) {
low = mid + 1;
} else {
find = true;
@ -145,14 +147,14 @@ int walRecordQuery(char**buffer,int* curpos,int* maxspace,uint64 lsn) {
int xllen = -1;
bool extandFlag = false;
if (find == true) {
record = (XLogRecord*)gManageFreeList[mid].data;
record = (XLogRecord*)gRingBufferManger->buffer[mid].data;
xllen = record->xl_tot_len;
} else {
record = (XLogRecord*)gManageFreeList[high].data;
if (gManageFreeList[mid].startLsn + gManageFreeList[mid].dataLen >= lsn) {
record = (XLogRecord*)gRingBufferManger->buffer[high].data;
if (gRingBufferManger->buffer[mid].startLsn + gRingBufferManger->buffer[mid].dataLen >= lsn) {
return -1;
} else {
record = (XLogRecord*)(gManageFreeList[high].data + lsn);
record = (XLogRecord*)(gRingBufferManger->buffer[high].data + (lsn-gRingBufferManger->buffer[high].startLsn));
xllen = record->xl_tot_len;
}
}

View File

@ -8047,17 +8047,6 @@ he3db_xlog_donot_to_replay(XLogRecord *record)
return true;
}
void pushSlaveReplayQueue(int pageNum) {
if (pg_atomic_read_u32(&PageHashQueueShmem->gpos) != 0 && pg_atomic_read_u32(&PageHashQueueShmem->ready) == 0) {
pg_atomic_exchange_u32(&PageHashQueueShmem->ready,1);
WakeupFlushWork();
}
while(pageNum > CompletedTaskNum()) {
usleep(10);
}
cleanMap();
}
void pushTikv(int onePageListLen,int pageNum,bool flag)
{
if (push_standby == true) {
@ -8149,7 +8138,7 @@ data_buffer_for_replay(XLogReaderState *record,XLogRecPtr startLsn,XLogRecPtr en
updateLastReplayLsn();
LWLockRelease(partition_lock);
}
if (push_standby == true) {
if (push_standby == true || EnableHotStandby == false) {
//tbspace now donot care for test
uint32_t count = addFileKey(&tag);
pushTikv(count,hashMapSize(),false);
@ -9287,7 +9276,7 @@ void StartupXLOG(void)
//initdb no need to replay
#ifndef PG_NOREPLAY
if(IsBootstrapProcessingMode() != true && InitdbSingle!=true) {
if (push_standby == true) {
if (push_standby == true || EnableHotStandby == false) {
pushTikv(0, hashMapSize(),false);
}
}
@ -14354,6 +14343,14 @@ next_record_is_invalid:
else
return -1;
}
bool startup_shutdown_requested = false;
static bool GetShutDownStatus(void) {
if (startup_shutdown_requested) {
return true;
}
return false;
}
static int
consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqLen,
@ -14365,7 +14362,7 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL
bool pushFlag = false;
do {
if (gRingBufferManger->maxIdx > spaceNum/16 || timeCount > 100000 || pushFlag == true) {
if (push_standby == true) {
if (push_standby == true||EnableHotStandby == false) {
pushTikv(0, hashMapSize(), true);
}
if (gRingBufferManger->maxIdx != 0 ) {
@ -14403,7 +14400,7 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL
usleep(10);
}
count = 0;
} while(shutdown_requested == false && (FirstData == NULL || startPtr > FirstData->startLsn));
} while(GetShutDownStatus() == false && (FirstData == NULL || startPtr > FirstData->startLsn));
return -1;
}
@ -14416,7 +14413,7 @@ producerXLogParallelBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr,
if (readedUpto == 0) {
readedUpto = startPtr;
}
while(shutdown_requested == false) {
while(GetShutDownStatus() == false) {
if (!WaitForWALToBecomeAvailable(readedUpto + reqLen,
private->randAccess,
private->fetching_ckpt,

View File

@ -362,7 +362,7 @@ restart:
#ifndef PG_NOREPLAY
if (IsBootstrapProcessingMode() != true && InitdbSingle!=true) {
if (getpid() == startupPid) {
if (push_standby == true) {
if (push_standby == true || EnableHotStandby == false ) {
/* Update shared-memory status */
XLogRecPtr prevPushPoint = PrePushPtr;
if (!XLogRecPtrIsInvalid(CheckPointPtr)) {
@ -722,7 +722,7 @@ restart:
#ifndef PG_NOREPLAY
if (IsBootstrapProcessingMode() != true && InitdbSingle!=true) {
if (getpid() == startupPid) {
if (push_standby == true) {
if (push_standby == true||EnableHotStandby == false) {
// Update shared-memory status
XLogRecPtr prevPushPoint = PrePushPtr;
if (!XLogRecPtrIsInvalid(CheckPointPtr)) {

View File

@ -103,8 +103,10 @@ StartupProcShutdownHandler(SIGNAL_ARGS)
if (in_restore_command)
proc_exit(1);
else
else {
shutdown_requested = true;
startup_shutdown_requested = true;
}
WakeupRecovery();
errno = save_errno;

View File

@ -493,7 +493,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
ForkNumber forkNum,
BlockNumber blockNum,
BufferAccessStrategy strategy,
bool *foundPtr);
bool *foundPtr,bool *exist);
static BufferDesc *He3DBBufferAlloc_replay(SMgrRelation smgr,
char relpersistence,
ForkNumber forkNum,
@ -857,12 +857,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
bool isExtend;
bool isLocalBuf = SmgrIsTemp(smgr);
/* he3db: local tem buffer for pageXlog */
char *pageXlogBuf;
int nbytes;
*hit = false;
pageXlogBuf = NULL;
/* Make sure we will have room to remember the buffer pin */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
@ -1166,12 +1161,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
blockNum,
relpath(smgr->smgr_rnode, forkNum))));
MemSet((char *) bufBlock, 0, BLCKSZ);
if(pageXlogBuf != NULL)
{
free(pageXlogBuf);
pageXlogBuf = NULL;
}
}
else
ereport(ERROR,

View File

@ -322,7 +322,7 @@ void He3dbLogIndexTblListInit(void)
log_index_mem_list->table_start_index = 0;
log_index_mem_list->active_table_index = 0;
log_index_mem_list->table_cap = logindex_mem_tbl_size;
SpinLockInit(&(log_index_mem_list->lock));
//SpinLockInit(&(log_index_mem_list->lock));
for (uint64 i = 0; i < log_index_mem_list->table_cap; i++) {
// set mem table init values
SpinLockInit(&(log_index_mem_list->mem_table[i].meta.meta_lock));

View File

@ -279,15 +279,12 @@ GetBackendTypeDesc(BackendType backendType)
case B_PARALLEL_FLUSH:
backendDesc = "parallel flush";
break;
<<<<<<< HEAD
case B_CLEAN_LOGINDEX:
backendDesc = "clean logindex";
break;
/*case B_RELATIONCLEANUP:
backendDesc = "relation cleanup";
break;*/
=======
>>>>>>> 859cb43019a91590e91ebe381861f9fe3f6b94a2
}
return backendDesc;

View File

@ -39,6 +39,7 @@ extern Size PageHashMapSize(void);
extern void PageHashQueueShmemInit(void);
Size LogindexHashAllShmemSize(void);
void InitLogindexHashBrucket(void);
void pushSlaveReplayQueue(int pageNum);
extern XLogRecPtr *g_redoStartLsn;
#endif

View File

@ -64,6 +64,7 @@ typedef struct wal_batch_t {
} wal_batch_t;
extern ring_buffer_t* gRingBufferManger;
extern const int spaceNum;
/**
* Structure which holds a ring buffer.
* The buffer contains a buffer array

View File

@ -31,6 +31,7 @@
extern int sync_method;
extern PGDLLIMPORT TimeLineID ThisTimeLineID; /* current TLI */
extern bool startup_shutdown_requested;
/*
* Prior to 8.4, all activity during recovery was carried out by the startup

View File

@ -18,5 +18,4 @@ extern void PreRestoreCommand(void);
extern void PostRestoreCommand(void);
extern bool IsPromoteSignaled(void);
extern void ResetPromoteSignaled(void);
#endif /* _STARTUP_H */