wal query in mempry

Code Source From: Self Code
Description:  【Optional】
Jira:  #【Optional】
市场项目编号(名称):【Optional】
This commit is contained in:
shenzhengntu 2023-03-28 16:32:40 +08:00
parent 261b092d84
commit 7c4e58e191
3 changed files with 23 additions and 18 deletions

View File

@ -151,7 +151,7 @@ int walRecordQuery(char**buffer,int* curpos,int* maxspace,uint64 lsn) {
xllen = record->xl_tot_len;
} else {
record = (XLogRecord*)gRingBufferManger->buffer[high].data;
if (gRingBufferManger->buffer[mid].startLsn + gRingBufferManger->buffer[mid].dataLen >= lsn) {
if (gRingBufferManger->buffer[high].startLsn + gRingBufferManger->buffer[high].dataLen <= lsn) {
return -1;
} else {
record = (XLogRecord*)(gRingBufferManger->buffer[high].data + (lsn-gRingBufferManger->buffer[high].startLsn));

View File

@ -1056,7 +1056,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
!isLocalBuf) && IsBootstrapProcessingMode() != true && InitdbSingle != true)
{
if (EnableHotStandby == true || InRecovery) {
if (push_standby == false) {
if (EnableHotStandby == true && push_standby == false) {
BufferTag pageTag;
pageTag.rnode = smgr->smgr_rnode.node;
pageTag.forkNum = forkNum;
@ -1065,7 +1065,9 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
XLogRecPtr pageLsn = BufferGetLSN(bufHdr);
head = GetLogIndexByPage(&pageTag,pageLsn,replayLsn);
GetXLogReplayRecPtr(&tli);
tWalRecord = ReadWalsByPage(pageTag.rnode.dbNode,pageTag.rnode.relNode,forkNum,blockNum,tli,head);
if (head->next!=NULL) {
tWalRecord = ReadWalsByPage(pageTag.rnode.dbNode,pageTag.rnode.relNode,forkNum,blockNum,tli,head);
}
} else {
walRecord.cap = 8192;
walRecord.buf = malloc(walRecord.cap);
@ -1116,8 +1118,10 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
pageTag.blockNum = blockNum;
XLogRecPtr pageLsn = BufferGetLSN(bufHdr);
head = GetLogIndexByPage(&pageTag,pageLsn,replayLsn);
if (push_standby == false) {
tWalRecord = ReadWalsByPage(pageTag.rnode.dbNode,pageTag.rnode.relNode,forkNum,blockNum,tli,head);
if (EnableHotStandby == true && push_standby == false) {
if (head->next != NULL) {
tWalRecord = ReadWalsByPage(pageTag.rnode.dbNode,pageTag.rnode.relNode,forkNum,blockNum,tli,head);
}
} else {
walRecord.cap = 8192;
walRecord.buf = malloc(walRecord.cap);
@ -1226,11 +1230,13 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
if (pageXlogPtr != NULL) {
free(pageXlogPtr);
pageXlogPtr = NULL;
} else if (tWalRecord.buf != NULL) {
} else if (tWalRecord.count != 0) {
free_dataRead(tWalRecord.buf,tWalRecord.count,tWalRecord.cap);
FreeLsnNode(head);
} else {
free(walRecord.buf);
if (walRecord.buf != NULL) {
free(walRecord.buf);
}
FreeLsnNode(head);
}
}

View File

@ -777,17 +777,18 @@ he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknu
Assert(bufrd.count == BLCKSZ);
pageKey.pageLsn = PageGetLSN(*buffer);
LsnNode *head = GetLogIndexByPage(&pageTag, pageKey.pageLsn, pageKey.replyLsn);
Bufrd result;
result = ReadWalsByPage(pageKey.relfileNode.dbNode, pageKey.relfileNode.relNode,
pageKey.forkNo, pageKey.blkNo, ThisTimeLineID, head);
if (result.count !=0) {
if (head->next != NULL) {
Bufrd result;
result = ReadWalsByPage(pageKey.relfileNode.dbNode, pageKey.relfileNode.relNode,
pageKey.forkNo, pageKey.blkNo, ThisTimeLineID, head);
*buffer = (uint8_t *)realloc(*buffer, BLCKSZ + result.count);
strcat(*buffer,result.buf);
free_dataRead(result.buf, result.count, result.cap);
return BLCKSZ + result.count;
}
//TODO free result
FreeLsnNode(head);
return BLCKSZ + result.count;
return bufrd.count;
}
// *buffer = bufrd.buf;
return bufrd.count;
@ -833,12 +834,10 @@ he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknu
result = ReadWalsByPage(pageKey.relfileNode.dbNode,pageKey.relfileNode.relNode,
pageKey.forkNo,pageKey.blkNo, ThisTimeLineID, head);
}
if (result.count !=0) {
buf = (uint8_t *)realloc(buf, BLCKSZ + result.count);
strcat(buf,result.buf);
//TODO free result
free_dataRead(result.buf, result.count, result.cap);
}
buf = (uint8_t *)realloc(buf, BLCKSZ + result.count);
strcat(buf,result.buf);
//TODO free result
free_dataRead(result.buf, result.count, result.cap);
*buffer = buf;
FreeLsnNode(head);
return BLCKSZ + result.count;