xlog read batch

Code Source From: Self Code
Description:  【Optional】
Jira:  #【Optional】
市场项目编号(名称):【Optional】
This commit is contained in:
shenzhengntu 2023-03-29 19:48:30 +08:00
parent 0c95e67dd9
commit 7c31fdf2e8
6 changed files with 71 additions and 7 deletions

View File

@ -18,7 +18,7 @@ static void getWalFunc(gpointer data, gpointer user_data) {
//elem->status = STARTSTATUS;
int r;
clock_t start = clock();
r = batchRead((uint8_t *) elem->data, ThisTimeLineID, elem->startLsn, walStoreToLocal);
r = batchRead((uint8_t *) elem->data, ThisTimeLineID, elem->startLsn, elem->endLsn, walStoreToLocal);
clock_t end = clock();
printf("====LSN %X/%X==pid %d==len %d===time %u\n",LSN_FORMAT_ARGS(elem->startLsn),pthread_self(),r,end-start);
elem->dataLen = r;

View File

@ -24,6 +24,7 @@ wal_batch_t *ring_buffer_queue(ring_buffer_t *buffer, wal_batch_t data) {
return NULL;
}
buffer->buffer[buffer->head_index].startLsn = data.startLsn;
buffer->buffer[buffer->head_index].endLsn = data.endLsn;
pg_atomic_exchange_u32(&buffer->buffer[buffer->head_index].status,(uint32_t)UNKOWNSTATUS);
curWal = &buffer->buffer[buffer->head_index];
buffer->head_index = ((buffer->head_index + 1) & RING_BUFFER_MASK(buffer));

View File

@ -14328,7 +14328,7 @@ retry:
if (EnableHotStandby && !push_standby)
walStoreToLocal = true;
r = batchRead((uint8_t *) readBuf, ControlFile->checkPointCopy.ThisTimeLineID, targetOff, walStoreToLocal);
r = batchRead((uint8_t *) readBuf, ControlFile->checkPointCopy.ThisTimeLineID, targetOff,targetOff+8192,walStoreToLocal);
pgstat_report_wait_end();
/*
@ -14483,6 +14483,7 @@ producerXLogParallelBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr,
for(;start < end;start += tlen) {
wal_batch_t walElem;
walElem.startLsn = start;
walElem.endLsn = start + tlen;
wal_batch_t *curElem = NULL;
while((curElem = ring_buffer_queue(gRingBufferManger,walElem)) == NULL) {
usleep(10);

View File

@ -841,7 +841,8 @@ He3DBReadBufferWithoutRelcache_replay(RelFileNode rnode, ForkNumber forkNum,
return He3DBReadBuffer_replay(smgr, RELPERSISTENCE_PERMANENT, forkNum, blockNum,
mode, strategy, hit);
}
FILE *fp = NULL;
/*
* ReadBuffer_common -- common logic for all ReadBuffer variants
*
@ -892,12 +893,31 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
relpath(smgr->smgr_rnode, forkNum),
P_NEW)));
}
if (fp == NULL) {
char filename[128]={0};
if (EnableHotStandby) {
snprintf(filename,sizeof(filename),"/home/postgres/sz/standby_%d.txt",getpid());
} else {
snprintf(filename,sizeof(filename),"/home/postgres/sz/primary_%d.txt",getpid());
}
fp = fopen(filename,"w+");
}
if (isLocalBuf)
{
bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
if (found)
if (found) {
char pageLsn[128] = {0};
int ret = 0;
if (EnableHotStandby) {
ret = snprintf(pageLsn,sizeof(pageLsn),"LocalBuf_page_standby_%d_%d_%d_%d_%x_%d\n",smgr->smgr_rnode.node.dbNode,smgr->smgr_rnode.node.relNode,forkNum,blockNum,PageXLogRecPtrGet(((PageHeader)BufHdrGetBlock(bufHdr))->pd_lsn),PageGetMaxOffsetNumber(BufHdrGetBlock(bufHdr)));
} else {
ret = snprintf(pageLsn,sizeof(pageLsn),"LocalBuf_page_primary_%d_%d_%d_%d_%x_%d\n",smgr->smgr_rnode.node.dbNode,smgr->smgr_rnode.node.relNode,forkNum,blockNum,PageXLogRecPtrGet(((PageHeader)BufHdrGetBlock(bufHdr))->pd_lsn),PageGetMaxOffsetNumber(BufHdrGetBlock(bufHdr)));
}
fwrite(pageLsn,ret,1,fp);
fflush(fp);
pgBufferUsage.local_blks_hit++;
}
else if (isExtend)
pgBufferUsage.local_blks_written++;
else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
@ -912,8 +932,19 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*/
bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
strategy, &found,&exist);
if (found)
if (found) {
char pageLsn[128] = {0};
int ret = 0;
if (EnableHotStandby) {
ret = snprintf(pageLsn,sizeof(pageLsn),"page_standby_%d_%d_%d_%d_%x_%d\n",smgr->smgr_rnode.node.dbNode,smgr->smgr_rnode.node.relNode,forkNum,blockNum,PageXLogRecPtrGet(((PageHeader)BufHdrGetBlock(bufHdr))->pd_lsn),PageGetMaxOffsetNumber(BufHdrGetBlock(bufHdr)));
} else {
ret = snprintf(pageLsn,sizeof(pageLsn),"page_primary_%d_%d_%d_%d_%x_%d\n",smgr->smgr_rnode.node.dbNode,smgr->smgr_rnode.node.relNode,forkNum,blockNum,PageXLogRecPtrGet(((PageHeader)BufHdrGetBlock(bufHdr))->pd_lsn),PageGetMaxOffsetNumber(BufHdrGetBlock(bufHdr)));
}
fwrite(pageLsn,ret,1,fp);
fflush(fp);
pgBufferUsage.shared_blks_hit++;
}
else if (isExtend)
pgBufferUsage.shared_blks_written++;
else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
@ -1216,16 +1247,46 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
{
XLogRecPtr pageLsn = BufferGetLSN(bufHdr);
char *xlogStart = NULL;
char prefix[100] = {0};
if (pageXlogPtr != NULL) {
xlogStart = pageXlogPtr + BLCKSZ;
nbytes = nbytes - BLCKSZ;
memcpy(prefix,"pageXlogPtr",strlen("pageXlogPtr"));
} else if (tWalRecord.buf != NULL) {
xlogStart = tWalRecord.buf;
nbytes = tWalRecord.count;
memcpy(prefix,"tWalRecord",strlen("pageXlogPtr"));
} else {
memcpy(prefix,"walRecord",strlen("pageXlogPtr"));
xlogStart = walRecord.buf;
nbytes = walRecord.count;
}
char str[2048000] = {0};
int ret = snprintf(str,sizeof(str),"%s_%lx_standby_%d_%d_%d_%d_%x:",prefix,replayLsn,smgr->smgr_rnode.node.dbNode,smgr->smgr_rnode.node.relNode,forkNum,blockNum,PageXLogRecPtrGet(((PageHeader)bufBlock)->pd_lsn));
XLogRecord *record;
int len = nbytes;
int pos = 0;
LsnNode*lsnPtr = NULL;
if (head!=NULL) {
LsnNode*lsnPtr = head->next;
}
if (xlogStart != NULL) {
while(len > 0) {
record = (XLogRecord *) (xlogStart+pos);
int onelen;
if (lsnPtr!=NULL) {
onelen = snprintf(str+ret,sizeof(str)-ret,"%x-lsnPtr-%x,",record->xl_end,lsnPtr->lsn);
lsnPtr = lsnPtr->next;
} else {
onelen = snprintf(str+ret,sizeof(str)-ret,"%x-%x,",record->xl_end,0);
}
ret += onelen;
len -= record->xl_tot_len;
pos += record->xl_tot_len;
}
}
str[ret] = '\n';
fwrite(str,ret+1,1,fp);
he3db_apply_page(bufHdr, xlogStart, nbytes);
if (pageXlogPtr != NULL) {
free(pageXlogPtr);

View File

@ -58,6 +58,7 @@ typedef enum BufferStatus{
typedef struct wal_batch_t {
XLogRecPtr startLsn;
XLogRecPtr endLsn;
int dataLen;
pg_atomic_uint32 status;
char* data;

View File

@ -63,7 +63,7 @@ extern Bufrd dataRead(int64_t fd,
extern void free_dataRead(uint8_t *buf, size_t count, size_t cap);
extern Bufrd readfs(int64_t fd, int64_t offset, uint32_t size);
extern int batchRead(uint8_t *buf, uint32_t timeline, uint64_t startPtr, bool needStore);
extern int batchRead(uint8_t *buf, uint32_t timeline, uint64_t startPtr,uint64_t endPtr, bool needStore);
extern uint8_t kvwrite(XLogItem *xlogItem);
extern uint8_t flushwals(XLogItem *xlogItem, uint32_t timeline);
extern uint8_t kvflush(XLogRecPtr lsn);