walrecord query

Code Source From: Self Code
Description:  【Optional】
Jira:  #【Optional】
市场项目编号(名称):【Optional】
This commit is contained in:
shenzhengntu 2023-03-30 10:35:20 +08:00
parent 331617c09f
commit cb3bc1eb79
2 changed files with 14 additions and 66 deletions

View File

@ -123,10 +123,19 @@ void InitRingBufferSpace(void) {
}
int walRecordQuery(char**buffer,int* curpos,int* maxspace,uint64 lsn) {
if (gRingBufferManger->maxIdx == 0) {
ring_buffer_size_t maxIdx = gRingBufferManger->maxIdx;
if (maxIdx == 0) {
return -1;
}
int low = 0,high = gRingBufferManger->maxIdx, mid = 0;
ring_buffer_size_t tailIdx = gRingBufferManger->tail_index;
int low = tailIdx,high = ((tailIdx+maxIdx) & RING_BUFFER_MASK(buffer)), mid = 0;
if (low > high) {
if (gRingBufferManger->buffer[0].startLsn!=0 && gRingBufferManger->buffer[0].startLsn <= lsn) {
low = 0;
} else {
high = gRingBufferManger->buffer_mask+1;
}
}
if (gRingBufferManger->buffer[high-1].startLsn == 0) {
high -= 2;
} else {

View File

@ -842,7 +842,6 @@ He3DBReadBufferWithoutRelcache_replay(RelFileNode rnode, ForkNumber forkNum,
mode, strategy, hit);
}
FILE *fp = NULL;
/*
* ReadBuffer_common -- common logic for all ReadBuffer variants
*
@ -893,31 +892,12 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
relpath(smgr->smgr_rnode, forkNum),
P_NEW)));
}
if (fp == NULL) {
char filename[128]={0};
if (EnableHotStandby) {
snprintf(filename,sizeof(filename),"/home/postgres/sz/standby_%d.txt",getpid());
} else {
snprintf(filename,sizeof(filename),"/home/postgres/sz/primary_%d.txt",getpid());
}
fp = fopen(filename,"w+");
}
if (isLocalBuf)
{
bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
if (found) {
char pageLsn[128] = {0};
int ret = 0;
if (EnableHotStandby) {
ret = snprintf(pageLsn,sizeof(pageLsn),"LocalBuf_page_standby_%d_%d_%d_%d_%x_%d\n",smgr->smgr_rnode.node.dbNode,smgr->smgr_rnode.node.relNode,forkNum,blockNum,PageXLogRecPtrGet(((PageHeader)BufHdrGetBlock(bufHdr))->pd_lsn),PageGetMaxOffsetNumber(BufHdrGetBlock(bufHdr)));
} else {
ret = snprintf(pageLsn,sizeof(pageLsn),"LocalBuf_page_primary_%d_%d_%d_%d_%x_%d\n",smgr->smgr_rnode.node.dbNode,smgr->smgr_rnode.node.relNode,forkNum,blockNum,PageXLogRecPtrGet(((PageHeader)BufHdrGetBlock(bufHdr))->pd_lsn),PageGetMaxOffsetNumber(BufHdrGetBlock(bufHdr)));
}
fwrite(pageLsn,ret,1,fp);
fflush(fp);
if (found)
pgBufferUsage.local_blks_hit++;
}
else if (isExtend)
pgBufferUsage.local_blks_written++;
else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
@ -932,19 +912,8 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*/
bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
strategy, &found,&exist);
if (found) {
char pageLsn[128] = {0};
int ret = 0;
if (EnableHotStandby) {
ret = snprintf(pageLsn,sizeof(pageLsn),"page_standby_%d_%d_%d_%d_%x_%d\n",smgr->smgr_rnode.node.dbNode,smgr->smgr_rnode.node.relNode,forkNum,blockNum,PageXLogRecPtrGet(((PageHeader)BufHdrGetBlock(bufHdr))->pd_lsn),PageGetMaxOffsetNumber(BufHdrGetBlock(bufHdr)));
} else {
ret = snprintf(pageLsn,sizeof(pageLsn),"page_primary_%d_%d_%d_%d_%x_%d\n",smgr->smgr_rnode.node.dbNode,smgr->smgr_rnode.node.relNode,forkNum,blockNum,PageXLogRecPtrGet(((PageHeader)BufHdrGetBlock(bufHdr))->pd_lsn),PageGetMaxOffsetNumber(BufHdrGetBlock(bufHdr)));
}
fwrite(pageLsn,ret,1,fp);
fflush(fp);
if (found)
pgBufferUsage.shared_blks_hit++;
}
else if (isExtend)
pgBufferUsage.shared_blks_written++;
else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
@ -1247,46 +1216,16 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
{
XLogRecPtr pageLsn = BufferGetLSN(bufHdr);
char *xlogStart = NULL;
char prefix[100] = {0};
if (pageXlogPtr != NULL) {
xlogStart = pageXlogPtr + BLCKSZ;
nbytes = nbytes - BLCKSZ;
memcpy(prefix,"pageXlogPtr",strlen("pageXlogPtr"));
} else if (tWalRecord.buf != NULL) {
xlogStart = tWalRecord.buf;
nbytes = tWalRecord.count;
memcpy(prefix,"tWalRecord",strlen("pageXlogPtr"));
} else {
memcpy(prefix,"walRecord",strlen("pageXlogPtr"));
xlogStart = walRecord.buf;
nbytes = walRecord.count;
}
char str[2048000] = {0};
int ret = snprintf(str,sizeof(str),"%s_%lx_standby_%d_%d_%d_%d_%x:",prefix,replayLsn,smgr->smgr_rnode.node.dbNode,smgr->smgr_rnode.node.relNode,forkNum,blockNum,PageXLogRecPtrGet(((PageHeader)bufBlock)->pd_lsn));
XLogRecord *record;
int len = nbytes;
int pos = 0;
LsnNode*lsnPtr = NULL;
if (head!=NULL) {
LsnNode*lsnPtr = head->next;
}
if (xlogStart != NULL) {
while(len > 0) {
record = (XLogRecord *) (xlogStart+pos);
int onelen;
if (lsnPtr!=NULL) {
onelen = snprintf(str+ret,sizeof(str)-ret,"%x-lsnPtr-%x,",record->xl_end,lsnPtr->lsn);
lsnPtr = lsnPtr->next;
} else {
onelen = snprintf(str+ret,sizeof(str)-ret,"%x-%x,",record->xl_end,0);
}
ret += onelen;
len -= record->xl_tot_len;
pos += record->xl_tot_len;
}
}
str[ret] = '\n';
fwrite(str,ret+1,1,fp);
he3db_apply_page(bufHdr, xlogStart, nbytes);
if (pageXlogPtr != NULL) {
free(pageXlogPtr);