diff --git a/src/backend/access/transam/pagehashqueue.c b/src/backend/access/transam/pagehashqueue.c index 285ce9e..b87a1ec 100644 --- a/src/backend/access/transam/pagehashqueue.c +++ b/src/backend/access/transam/pagehashqueue.c @@ -411,6 +411,8 @@ CleanLogIndexMain(int argc, char *argv[]) TagNode *next = tagList->next; int pageNum = 0; while(next!=NULL) { + // elog(LOG,"add tag rel %d, fork %d, blk %d", + // next->tag.tag.rnode.relNode, next->tag.tag.forkNum, next->tag.tag.blockNum); addFileKey(&next->tag.tag); next = next->next; pageNum++; @@ -419,10 +421,12 @@ CleanLogIndexMain(int argc, char *argv[]) pushSlaveReplayQueue(pageNum); hasData++; PrevPushPoint = LastPushPoint+1; + SetXLogPushToDisk(PrevPushPoint); pushStandbyPoint = GetConsistLsn(PrevPushPoint); } else { LastPushPoint = PrevPushPoint = lastReplPtr; if (pushStandbyPrePoint < PrevPushPoint) { + SetXLogPushToDisk(PrevPushPoint); pushStandbyPoint = GetConsistLsn(PrevPushPoint+1); } } @@ -438,7 +442,7 @@ CleanLogIndexMain(int argc, char *argv[]) elog(LOG,"start threadCleanLogIndex lsn from %X/%X to %X/%X",LSN_FORMAT_ARGS(pushStandbyPrePoint),LSN_FORMAT_ARGS(pushStandbyPoint)); CleanLogIndexByPage(pushStandbyPoint); //threadCleanLogIndex(LastPushPoint); - elog(LOG,"end threadCleanLogIndex lsn from %X/%X to %X/%X",LSN_FORMAT_ARGS(pushStandbyPrePoint),LSN_FORMAT_ARGS(pushStandbyPrePoint)); + elog(LOG,"end threadCleanLogIndex lsn from %X/%X to %X/%X",LSN_FORMAT_ARGS(pushStandbyPrePoint),LSN_FORMAT_ARGS(pushStandbyPoint)); } if (hasData != 0) { continue; @@ -672,10 +676,13 @@ void ProcFlushBufferToDisk(BufferTag*tag) { RBM_NORMAL); if (!BufferIsValid(buffer)) { - elog(FATAL,"ProcFlushBufferToDisk is invalid rel %d,flk %d,blk %d",tag->rnode.relNode,tag->forkNum,tag->blockNum); + elog(PANIC,"ProcFlushBufferToDisk is invalid rel %d,flk %d,blk %d",tag->rnode.relNode,tag->forkNum,tag->blockNum); pg_atomic_fetch_add_u32(&PageHashQueueShmem->taskNum,1); return; } + + // elog(LOG, "replay rel %d, fork %d, blkno %d, pagelsn %X/%X", tag->rnode.relNode, + // tag->forkNum,tag->blockNum, LSN_FORMAT_ARGS(PageGetLSN(BufferGetPage(buffer)))); //slave no need to flush disk if (push_standby == true) { BufferDesc *buf; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9fca9e6..8ab9d28 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7990,7 +7990,7 @@ void pushTikv(int onePageListLen,int pageNum,bool flag) if (ApplyLsn < PushPtr) { ApplyLsn = PushPtr; } - if (push_standby == true && XLogCtl->pushToDisk != ApplyLsn) { + if (push_standby == true && XLogCtl->pushToDisk != ApplyLsn && !mpush) { InsertConsistToKV(ApplyLsn); } XLogCtl->pushToDisk = ApplyLsn; @@ -8001,7 +8001,7 @@ void pushTikv(int onePageListLen,int pageNum,bool flag) if (ApplyLsn < PushPtr) { ApplyLsn = PushPtr; } - if (push_standby == true && XLogCtl->pushToDisk != ApplyLsn) { + if (push_standby == true && XLogCtl->pushToDisk != ApplyLsn && !mpush) { InsertConsistToKV(ApplyLsn); } XLogCtl->pushToDisk = ApplyLsn; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 181cdd1..49e5dbc 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -762,21 +762,8 @@ restart: printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr); CheckPointPtr = InvalidXLogRecPtr; } - //need to tell push standby has new standby add - while(ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) { - //XLogRecPtr lsn = GetFlushXlogPtr(); - XLogRecPtr tmpLsn = QueryMinLsn(InvalidXLogRecPtr); - if (tmpLsn !=InvalidXLogRecPtr) { - if (tmpLsn <= RecPtr) { - pg_usleep(50000); - continue; - } else { - ApplyLsn = tmpLsn; - } - } else { - ApplyLsn = InvalidXLogRecPtr; - } - break; + if (ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) { + ApplyLsn = InvalidXLogRecPtr; } } } @@ -1124,21 +1111,8 @@ restart: printf("curlsn==%x==prevPushPoint==%x==PushPoint==%x==last check point lsn====%x\n",RecPtr,prevPushPoint,PushPtr,CheckPointPtr); CheckPointPtr = InvalidXLogRecPtr; } - //need to tell push standby has new standby add - while(ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) { - //XLogRecPtr lsn = GetFlushXlogPtr(); - XLogRecPtr tmpLsn = QueryMinLsn(InvalidXLogRecPtr); - if (tmpLsn !=InvalidXLogRecPtr) { - if (tmpLsn <= RecPtr) { - usleep(200); - continue; - } else { - ApplyLsn = tmpLsn; - } - } else { - ApplyLsn = InvalidXLogRecPtr; - } - break; + if (ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) { + ApplyLsn = InvalidXLogRecPtr; } } } diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index eecd193..a85af1b 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -632,7 +632,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, else { /* hm, page doesn't exist in file */ - if(!he3mirror){ + if(!he3mirror && he3share){ if (mode == RBM_NORMAL && EnableHotStandby != false && *isPromoteIsTriggered == false) { log_invalid_page(rnode, forknum, blkno, false); @@ -668,7 +668,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, } } - if (!he3mirror && mode == RBM_NORMAL) + if (he3share && !he3mirror && mode == RBM_NORMAL) { /* check that page has been initialized */ Page page = (Page) BufferGetPage(buffer); diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c index 8517259..d25d41b 100644 --- a/src/backend/catalog/storage.c +++ b/src/backend/catalog/storage.c @@ -925,7 +925,7 @@ smgr_redo(XLogReaderState *record) reln = smgropen(xlrec->rnode, InvalidBackendId); /* He3DB: propeller instance and He3DB slave instance not create rel file*/ - if (*isPromoteIsTriggered || !EnableHotStandby || he3mirror) + if (*isPromoteIsTriggered || !EnableHotStandby || he3mirror || !he3share) { smgrcreate(reln, xlrec->forkNum, true); } @@ -949,7 +949,7 @@ smgr_redo(XLogReaderState *record) * log as best we can until the drop is seen. */ /* He3DB: propeller instance and He3DB slave instance not create rel file*/ - if (*isPromoteIsTriggered || !EnableHotStandby || he3mirror) + if (*isPromoteIsTriggered || !EnableHotStandby || he3mirror || !he3share) { smgrcreate(reln, MAIN_FORKNUM, true); } @@ -1008,8 +1008,8 @@ smgr_redo(XLogReaderState *record) } /* Do the real work to truncate relation forks */ - if (nforks > 0 && (!EnableHotStandby || *isPromoteIsTriggered)) - smgrtruncate(reln, forks, nforks, blocks); + if (nforks > 0 && (!EnableHotStandby || *isPromoteIsTriggered || !he3share)) + smgrtruncatelsn(reln, forks, nforks, blocks, record->ReadRecPtr); /* * Update upper-level FSM pages to account for the truncation. This is diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 356f6a9..d58e96b 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -2258,8 +2258,28 @@ dbase_redo(XLogReaderState *record) * We don't need to copy subdirectories */ /* He3DB: propeller instance and He3DB slave instance not create db file*/ - if (!EnableHotStandby || *isPromoteIsTriggered || he3mirror) + if (!EnableHotStandby || *isPromoteIsTriggered || he3mirror || !he3share) { + // int count = 0; + // for (;;) + // { + // XLogRecPtr pushlsn; + // XLogRecPtr lastlsn = record->currRecPtr; + // pushlsn = QueryPushChkpointLsn(); + // if (pushlsn == InvalidXLogRecPtr) + // ereport(ERROR, + // (errcode(ERRCODE_INTERNAL_ERROR), + // errmsg("push standby's latest apply lsn shouldn't be 0"))); + // if (lastlsn <= pushlsn) + // break; + // if (count > 100) + // ereport(ERROR, + // (errcode(ERRCODE_INTERNAL_ERROR), + // errmsg("push standby's latest apply lsn(%X/%X) is still behind primary(%X/%X) after try 100 times.", + // LSN_FORMAT_ARGS(pushlsn), LSN_FORMAT_ARGS(lastlsn)))); + // pg_usleep(1000000L); + // count++; + // } copydir(src_path, dst_path, false); } } diff --git a/src/backend/postmaster/secondbuffer.c b/src/backend/postmaster/secondbuffer.c index 71bfb40..04894fe 100644 --- a/src/backend/postmaster/secondbuffer.c +++ b/src/backend/postmaster/secondbuffer.c @@ -477,8 +477,29 @@ GetPageFromLocalBuffer(PageKey *pk,uint8_t *buffer) } } +uint64_t +SwapLsn(uint64_t lsn) { + #ifndef WORDS_BIGENDIAN + /* trans lsn from little endian to big endian in memory + * eg: 0x12345678 ===> 0x78563412 + */ + + uint32 low, high; + low = (uint32) (lsn); + high = (uint32) ((lsn) >> 32); + + low = (low << 16) | (low >> 16); + low = ((low & 0x00FF00FF) << 8) | ((low >> 8) & 0x00FF00FF); + + high = (high << 16) | (high >> 16); + high = ((high & 0x00FF00FF) << 8) | ((high >> 8) & 0x00FF00FF); + return ((uint64)(low)) << 32 | (uint64)(high); + #endif + return lsn; +} + Bufrd -GetWalFromLocalBuffer(WalLdPageKey *wpk) +GetWalFromLocalBuffer(WalLdPageKey *wpk, uint64_t replyLsn) { MDB_txn *tmptxn; MDB_cursor *tmpcursor; @@ -508,24 +529,35 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk) if (tb != 0) { //TODO + ereport(PANIC,errmsg("mdb_txn_begin failed,error is:%d",tb)); } co = mdb_cursor_open(tmptxn, walDbi,&tmpcursor); - if (tb != 0) + if (co != 0) { //TODO + ereport(PANIC,errmsg("mdb_txn_open failed,error is:%d",co)); } - if ((cg = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET)) != 0) + // ereport(LOG,errmsg("535 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", + // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition)); + if ((cg = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET_RANGE)) != 0) { + ereport(LOG, errmsg("mdb_txn_get failed,error is:%d, dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", + cg, dbid, relid, forkno, blkno, SwapLsn(wpk->pageLsn), wpk->partition)); bufrd.buf = waldata; mdb_cursor_close(tmpcursor); mdb_txn_abort(tmptxn); return bufrd; } + wpk = (WalLdPageKey *)key.mv_data; + + // ereport(LOG,errmsg("549 get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", + // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition)); while (wpk->sk.dbid == dbid && wpk->sk.relid == relid && wpk->sk.forkno == forkno - && wpk->sk.blkno == blkno) + && wpk->sk.blkno == blkno + && SwapLsn(wpk->pageLsn) < replyLsn) { memcpy(waldata + waldatalen,data.mv_data,data.mv_size); waldatalen += data.mv_size; @@ -547,6 +579,8 @@ GetWalFromLocalBuffer(WalLdPageKey *wpk) } wpk = (WalLdPageKey *)key.mv_data; + // ereport(LOG,errmsg("get key dbid %d, relid %d, fork %d, blk %d, pagelsn %ld, part %d", + // wpk->sk.dbid, wpk->sk.relid, wpk->sk.forkno, wpk->sk.blkno, SwapLsn(wpk->pageLsn), wpk->partition)); } } @@ -607,12 +641,14 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) if (tb != 0) { //TODO + ereport(LOG,errmsg("put mdb_txn_begin failed,error is:%d",tb)); } co = mdb_cursor_open(tmptxn,walDbi,&tmpcursor); if (co != 0) { //TODO + ereport(LOG,errmsg("put mdb_txn_open failed,error is:%d",co)); } for(int i = 0; i < length; i ++) @@ -636,17 +672,19 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) data.mv_size = 511; data.mv_data = NULL; xlogContent = (uint8_t *)malloc(511); - wlpk.pageLsn = ks[i].lsn; + wlpk.pageLsn = SwapLsn(ks[i].lsn); wlpk.partition = part; + + memcpy(xlogContent,buf + (part * 511), 511); //502 = 511 - 9 part ++; - memcpy(xlogContent,buf, 511); //502 = 511 - 9 totallen -= 511; key.mv_data = &wlpk; data.mv_data = xlogContent; - cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA); if (cp != 0) { + ereport(PANIC,errmsg("651 mdb_txn_put failed,error is:%d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d", + cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, SwapLsn(wlpk.pageLsn), wlpk.partition)); if (cp == MDB_KEYEXIST) { break; @@ -660,14 +698,16 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) data.mv_size = totallen; data.mv_data = NULL; xlogContent = (uint8_t *)malloc(totallen); - wlpk.pageLsn = ks[i].lsn; + wlpk.pageLsn = SwapLsn(ks[i].lsn); wlpk.partition = part; key.mv_data = &wlpk; - memcpy(xlogContent,buf,totallen); + memcpy(xlogContent,buf + (part * 511),totallen); data.mv_data = xlogContent; cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA); if (cp != 0) { + ereport(LOG,errmsg("673 mdb_txn_put failed,error is:%d, rel %d, forkno %d, blk %d", + cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno)); if (cp == MDB_KEYEXIST) { break; @@ -684,7 +724,7 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) data.mv_size = totallen; data.mv_data = NULL; xlogContent = (uint8_t *)malloc(totallen); - wlpk.pageLsn = ks[i].lsn; + wlpk.pageLsn = SwapLsn(ks[i].lsn); wlpk.partition = 0; key.mv_data = &wlpk; memcpy(xlogContent,buf,totallen); @@ -692,8 +732,15 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) cp = mdb_cursor_put(tmpcursor,&key,&data, MDB_NODUPDATA); if (cp != 0) { + ereport(LOG,errmsg("730 mdb_txn_put failed,error is:%d, dbid %d, rel %d, forkno %d, blk %d, pagelsn %ld, part %d", + cp, wlpk.sk.dbid, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno, + SwapLsn(wlpk.pageLsn), wlpk.partition)); if (cp == MDB_KEYEXIST) { + cp = mdb_cursor_get(tmpcursor, &key, &data, MDB_SET); + if (cp != 0) + ereport(LOG,errmsg(" 737 mdb_txn_get failed,error is:%d, rel %d, forkno %d, blk %d", + cp, wlpk.sk.relid, wlpk.sk.forkno, wlpk.sk.blkno)); continue; } } @@ -703,7 +750,9 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length) } mdb_cursor_close(tmpcursor); - mdb_txn_commit(tmptxn); + co = mdb_txn_commit(tmptxn); + if (co != 0) + ereport(LOG,errmsg("put mdb_txn_commit failed,error is:%d",co)); tmpcursor = NULL; tmptxn = NULL; // pthread_mutex_unlock(&q_lock); @@ -746,9 +795,11 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd) WalLdPageKey wlpk; wlpk.sk.dbid = pk.relfileNode.dbNode; wlpk.sk.relid = pk.relfileNode.relNode; - wlpk.pageLsn = pk.pageLsn; + wlpk.sk.forkno = pk.forkNo; + wlpk.sk.blkno = pk.blkNo; + wlpk.pageLsn = SwapLsn(pk.pageLsn); wlpk.partition = 0; - Bufrd waldata = GetWalFromLocalBuffer(&pk); + Bufrd waldata = GetWalFromLocalBuffer(&wlpk, pk.replyLsn); if (waldata.count > 0) { bufrd->buf = (uint8_t *)realloc(bufrd->buf,8192 + waldata.count); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 7c6d623..a9297e3 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -245,6 +245,7 @@ static void XLogSendPhysical(void); static void XLogSendLogical(void); static void WalSndDone(WalSndSendDataCallback send_data); static XLogRecPtr GetStandbyFlushRecPtr(void); +static XLogRecPtr GetStandbyReplayRecPtr(void); static void IdentifySystem(void); static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); static void DropReplicationSlot(DropReplicationSlotCmd *cmd); @@ -412,7 +413,7 @@ IdentifySystem(void) if (am_cascading_walsender) { /* this also updates ThisTimeLineID */ - logptr = GetStandbyFlushRecPtr(); + logptr = GetStandbyReplayRecPtr(); } else logptr = GetFlushRecPtr(); @@ -639,7 +640,7 @@ StartReplication(StartReplicationCmd *cmd) if (am_cascading_walsender) { /* this also updates ThisTimeLineID */ - FlushPtr = GetStandbyFlushRecPtr(); + FlushPtr = GetStandbyReplayRecPtr(); } else FlushPtr = GetFlushRecPtr(); @@ -2911,7 +2912,7 @@ XLogSendTiKVPhysical(void) */ bool becameHistoric = false; - SendRqstPtr = GetStandbyFlushRecPtr(); + SendRqstPtr = GetStandbyReplayRecPtr(); if (!RecoveryInProgress()) { @@ -2927,7 +2928,7 @@ XLogSendTiKVPhysical(void) /* * Still a cascading standby. But is the timeline we're sending * still the one recovery is recovering from? ThisTimeLineID was - * updated by the GetStandbyFlushRecPtr() call above. + * updated by the GetStandbyReplayRecPtr() call above. */ if (sendTimeLine != ThisTimeLineID) becameHistoric = true; @@ -3220,7 +3221,7 @@ XLogSendPhysical(void) */ bool becameHistoric = false; - SendRqstPtr = GetStandbyFlushRecPtr(); + SendRqstPtr = GetStandbyReplayRecPtr(); if (!RecoveryInProgress()) { @@ -3236,7 +3237,7 @@ XLogSendPhysical(void) /* * Still a cascading standby. But is the timeline we're sending * still the one recovery is recovering from? ThisTimeLineID was - * updated by the GetStandbyFlushRecPtr() call above. + * updated by the GetStandbyReplayRecPtr() call above. */ if (sendTimeLine != ThisTimeLineID) becameHistoric = true; @@ -3627,6 +3628,28 @@ GetStandbyFlushRecPtr(void) return result; } +static XLogRecPtr +GetStandbyReplayRecPtr(void) +{ + XLogRecPtr replayPtr; + XLogRecPtr consistPtr; + XLogRecPtr result; + TimeLineID replayTLI; + + /* + * We can safely send what's already been replayed. + */ + + replayPtr = GetXLogReplayRecPtr(&replayTLI); + consistPtr = GetXLogPushToDisk(); + Assert(consistPtr <= replayPtr); + + ThisTimeLineID = replayTLI; + + + return consistPtr; +} + /* * Request walsenders to reload the currently-open WAL file */ diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 14e75db..1ea95f9 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1933,7 +1933,8 @@ retry: /* safety check: should definitely not be our *own* pin */ if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0) elog(ERROR, "buffer is pinned in InvalidateBuffer"); - WaitIO(buf); + // WaitIO(buf); + pg_usleep(1000L); goto retry; } @@ -5077,7 +5078,17 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) if (clear_dirty && !(buf_state & BM_JUST_DIRTIED)) buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - buf_state |= set_flag_bits; + if (!(IsBootstrapProcessingMode() == true || InitdbSingle == true) && (InRecovery || EnableHotStandby) && set_flag_bits == BM_VALID) + { + XLogRecPtr pageLsn = BufferGetLSN(buf); + XLogRecPtr replayLsn = GetXLogReplayRecPtr(NULL); + LsnNode *head = GetLogIndexByPage(&buf->tag, pageLsn, replayLsn); + if (head->next != NULL) + buf_state &= ~BM_VALID; + FreeLsnNode(head); + } else { + buf_state |= set_flag_bits; + } UnlockBufHdr(buf, buf_state); if (!bulk_io_is_in_progress) { diff --git a/src/backend/storage/lmgr/he3db_logindex.c b/src/backend/storage/lmgr/he3db_logindex.c index 6c9ce93..76933b1 100644 --- a/src/backend/storage/lmgr/he3db_logindex.c +++ b/src/backend/storage/lmgr/he3db_logindex.c @@ -603,7 +603,7 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn) if(mem_tbl->meta.max_lsn < start_lsn) { continue; - }else if(mem_tbl->meta.min_lsn >= end_lsn) + }else if(mem_tbl->meta.min_lsn > end_lsn) { // there is no suitability lsn_list after this mem table LWLockRelease(LogIndexMemListLock); @@ -628,7 +628,7 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn) page_min_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, first_seg->suffix_lsn[0]); uint8 id = Min(LOG_INDEX_MEM_ITEM_SEG_LSN_NUM - 1, last_seg->number - 1); page_max_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, last_seg->suffix_lsn[id]); - if(page_min_lsn >= end_lsn || page_max_lsn < start_lsn) + if(page_min_lsn > end_lsn || page_max_lsn < start_lsn) { continue; } @@ -646,7 +646,7 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn) LogIndexMemTBL *mem_tbl = &(log_index_mem_list->mem_table[tbl_index]); // current mem table no suitability lsn_list - if(!(mem_tbl->meta.max_lsn < start_lsn || mem_tbl->meta.min_lsn >= end_lsn)) + if(!(mem_tbl->meta.max_lsn < start_lsn || mem_tbl->meta.min_lsn > end_lsn)) { end_lsn = Min(end_lsn, mem_tbl->meta.max_lsn); head_node->tag.lsn = end_lsn; @@ -665,7 +665,7 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn) page_min_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, first_seg->suffix_lsn[0]); uint8 id = Min(LOG_INDEX_MEM_ITEM_SEG_LSN_NUM - 1, last_seg->number - 1); page_max_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, last_seg->suffix_lsn[id]); - if(page_min_lsn >= end_lsn || page_max_lsn < start_lsn) + if(page_min_lsn > end_lsn || page_max_lsn < start_lsn) { continue; } diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index 55f2a00..e168ffd 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -781,6 +781,8 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo GetPageFromCurrentNode(pageKey, bufrd); count = bufrd->count; + // elog(LOG, "read page from local rel %d, fork %d, blk %d, nbytes %d, replaylsn %X/%X", + // pageTag.rnode.relNode, pageTag.forkNum, pageTag.blockNum, count, LSN_FORMAT_ARGS(lsn)); } if (count > 0) @@ -838,10 +840,12 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo WalLdPageKey wlpk; wlpk.sk.dbid = pageKey.relfileNode.dbNode; wlpk.sk.relid = pageKey.relfileNode.relNode; - wlpk.pageLsn = pageKey.pageLsn; + wlpk.sk.forkno = pageKey.forkNo; + wlpk.sk.blkno = pageKey.blkNo; + wlpk.pageLsn = SwapLsn(pageKey.pageLsn); wlpk.partition = 0; result.count = 0; - result = GetWalFromLocalBuffer(&wlpk); + result = GetWalFromLocalBuffer(&wlpk, lsn); if (result.count == 0) { diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 73ed8e6..34aac02 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -484,7 +484,7 @@ smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, // return; // } - if ((push_standby != true && (EnableHotStandby != true || *isPromoteIsTriggered)) || IsBootstrapProcessingMode() || InitdbSingle || he3mirror) { + if (!he3share || (push_standby != true && (EnableHotStandby != true || *isPromoteIsTriggered)) || IsBootstrapProcessingMode() || InitdbSingle || he3mirror) { smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum, buffer, skipFsync); // elog(LOG,"smgrextend reln %d,flk %d,blk %d",reln->smgr_rnode.node.relNode,forknum,blocknum); diff --git a/src/backend/storage/sync/sync.c b/src/backend/storage/sync/sync.c index 96972ab..e96ca23 100644 --- a/src/backend/storage/sync/sync.c +++ b/src/backend/storage/sync/sync.c @@ -379,7 +379,7 @@ ProcessSyncRequests(void) * all. (We delay checking until this point so that changing fsync on * the fly behaves sensibly.) */ - if (enableFsync) + if (enableFsync && push_standby) { /* * If in checkpointer, we want to absorb pending requests every so diff --git a/src/backend/utils/activity/backend_status.c b/src/backend/utils/activity/backend_status.c index e443aed..8c3418f 100644 --- a/src/backend/utils/activity/backend_status.c +++ b/src/backend/utils/activity/backend_status.c @@ -1155,13 +1155,15 @@ XLogRecPtr He3DBQueryMinLsnFromAllStanby() { int i; XLogRecPtr minApplyLsn = 0; - int procpid = -1; + int *procpids; + int maxid = 0; + procpids = (int *) malloc(max_wal_senders * sizeof(int)); for (i = 0; i < NumBackendStatSlots; i++) { - if (strcmp(BackendStatusArray[i].st_appname, "pgmirror") == 0) + if (strcmp(BackendStatusArray[i].st_appname, "pgmirror") == 0 || memcmp(BackendStatusArray[i].st_appname, "priv", 4) == 0) { - procpid = BackendStatusArray[i].st_procpid; - break; + procpids[maxid] = BackendStatusArray[i].st_procpid; + maxid++; } } Assert(WalSndCtl != NULL); @@ -1180,12 +1182,23 @@ XLogRecPtr He3DBQueryMinLsnFromAllStanby() pid = walsnd->pid; apply = walsnd->apply; SpinLockRelease(&walsnd->mutex); - if (pid != procpid) + int j; + bool exist = false; + for (j = 0; j < maxid; j++) + { + if (pid == procpids[j]) + { + exist = true; + break; + } + } + if (!exist) { if (apply < minApplyLsn || minApplyLsn == 0) minApplyLsn = apply; } } + free(procpids); return minApplyLsn; } \ No newline at end of file diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 0452dcf..3c15903 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -607,6 +607,8 @@ char *pgstat_temp_directory; char *application_name; bool push_standby = false; +bool he3share = true; +bool mpush = false; bool he3_point_in_time_recovery; bool he3mirror = false; bool pgmirror = false; @@ -2137,6 +2139,24 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"he3share", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY, + gettext_noop("Sets storage is shared if he3share is configured true."), + }, + &he3share, + true, + NULL, NULL, NULL + }, + + { + {"mpush", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY, + gettext_noop("Sets push_standby is belong to master if mpush is configured true."), + }, + &mpush, + false, + NULL, NULL, NULL + }, + { {"he3_point_in_time_recovery", PGC_SIGHUP, WAL_ARCHIVE_RECOVERY, gettext_noop("Sets whether we are in he3 pitr"), diff --git a/src/include/postmaster/secondbuffer.h b/src/include/postmaster/secondbuffer.h index 5258d42..d2964fc 100644 --- a/src/include/postmaster/secondbuffer.h +++ b/src/include/postmaster/secondbuffer.h @@ -119,7 +119,7 @@ extern void storeWalInLocalBuffer(kvStruct *ks,int32 length); extern void ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer); // when evict one page out databuffer, we should call this to store the page. extern void GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd); // async delete old version page and wal. we should call this when move page from ld/sdb to db. extern Bufrd GetWalFromLd(PageKey *pk); -extern Bufrd GetWalFromLocalBuffer(WalLdPageKey *pk); +extern Bufrd GetWalFromLocalBuffer(WalLdPageKey *pk, uint64_t replyLsn); extern void AddOneItemToDPArray(OriginDPageKey pk); extern void SecondBufferMain(void); extern void ClosePageDBEnv(void); @@ -128,3 +128,4 @@ extern void CloseWalEnv(void); extern void CreateSecondBufferLWLocks(void); extern Size SecondBufferLWLockShmemSize(void); extern Size SecondBufferShmemSize(void); +extern uint64_t SwapLsn(uint64_t lsn); diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index c03177e..6d983dd 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -277,6 +277,8 @@ extern PGDLLIMPORT bool push_standby; extern PGDLLIMPORT bool he3_point_in_time_recovery; extern PGDLLIMPORT bool he3mirror; extern PGDLLIMPORT bool pgmirror; +extern PGDLLIMPORT bool he3share; +extern PGDLLIMPORT bool mpush; extern int tcp_keepalives_idle;