mirror of
https://gitee.com/he3db/he3pg.git
synced 2024-12-02 04:07:34 +08:00
!127 make pg_waldump work
Merge pull request !127 from zoujia_yewu/dev_performance
This commit is contained in:
commit
758de265bf
@ -57,7 +57,7 @@
|
||||
#include "postmaster/bgwriter.h"
|
||||
#include "postmaster/startup.h"
|
||||
#include "postmaster/walwriter.h"
|
||||
#include "postmaster/seconfbuffer.h"
|
||||
#include "postmaster/secondbuffer.h"
|
||||
#include "replication/basebackup.h"
|
||||
#include "replication/logical.h"
|
||||
#include "replication/origin.h"
|
||||
@ -80,7 +80,6 @@
|
||||
#include "storage/sync.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/guc.h"
|
||||
// #include "utils/hfs.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/ps_status.h"
|
||||
#include "utils/relmapper.h"
|
||||
|
@ -1931,6 +1931,8 @@ err:
|
||||
// return true;
|
||||
// }
|
||||
|
||||
|
||||
|
||||
int
|
||||
He3DBWALRead(XLogReaderState *state,
|
||||
XLogRecPtr startptr, int count, char *buf)
|
||||
@ -1941,7 +1943,7 @@ He3DBWALRead(XLogReaderState *state,
|
||||
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
|
||||
#endif
|
||||
|
||||
nbytes = batchRead((uint8_t *) buf, state->currTLI, startptr, startptr+16384, false);
|
||||
nbytes = batchReadForTools((uint8_t *) buf, state->currTLI, startptr, startptr+16384, false);
|
||||
|
||||
#ifndef FRONTEND
|
||||
pgstat_report_wait_end();
|
||||
@ -1952,6 +1954,7 @@ He3DBWALRead(XLogReaderState *state,
|
||||
return nbytes;
|
||||
}
|
||||
|
||||
|
||||
/* ----------------------------------------
|
||||
* Functions for decoding the data and block references in a record.
|
||||
* ----------------------------------------
|
||||
|
@ -37,7 +37,7 @@
|
||||
#include "postmaster/bgwriter.h"
|
||||
#include "postmaster/startup.h"
|
||||
#include "postmaster/walwriter.h"
|
||||
#include "postmaster/seconfbuffer.h"
|
||||
#include "postmaster/secondbuffer.h"
|
||||
#include "replication/walreceiver.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/bufpage.h"
|
||||
|
@ -114,7 +114,7 @@
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "postmaster/pgarch.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "postmaster/seconfbuffer.h"
|
||||
#include "postmaster/secondbuffer.h"
|
||||
#include "postmaster/syslogger.h"
|
||||
#include "replication/logicallauncher.h"
|
||||
#include "replication/walsender.h"
|
||||
|
@ -4,10 +4,53 @@
|
||||
#include <unistd.h>
|
||||
#include <lmdb.h>
|
||||
#include <pthread.h>
|
||||
#include "postmaster/seconfbuffer.h"
|
||||
#include "postmaster/secondbuffer.h"
|
||||
#include <stdio.h>
|
||||
#include "utils/guc.h"
|
||||
|
||||
#include <stdbool.h>
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "libpq/pqsignal.h"
|
||||
#include "storage/s_lock.h"
|
||||
#include "storage/spin.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "storage/bufpage.h"
|
||||
#include "storage/pmsignal.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "storage/procsignal.h"
|
||||
#include "utils/dynahash.h"
|
||||
#include "miscadmin.h"
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
#include <time.h>
|
||||
|
||||
|
||||
typedef struct SingleKeyArray
|
||||
{
|
||||
SdPageKey SdPageKeyList[SDLEN];
|
||||
uint16 head;
|
||||
uint16 tail;
|
||||
uint16 unused;
|
||||
slock_t oplock;
|
||||
} SingleKeyArray;
|
||||
|
||||
typedef struct Statisticnum
|
||||
{
|
||||
double totalunused;
|
||||
slock_t change;
|
||||
} Statisticnum;
|
||||
|
||||
typedef struct DPageKeyArray
|
||||
{
|
||||
DPageKey dpk[1024];
|
||||
uint16 unused;
|
||||
uint16 head;
|
||||
uint16 tail;
|
||||
uint16 pageIndex;
|
||||
uint16 walIndex;
|
||||
slock_t append;
|
||||
} DPageKeyArray;
|
||||
/*
|
||||
secondbufferhash code
|
||||
*/
|
||||
@ -165,7 +208,6 @@ InitSecondBufferHash(void)
|
||||
void
|
||||
InitPageDBEnv()
|
||||
{
|
||||
// ereport(LOG, (errmsg("InitPageDBEnv in processing")));
|
||||
pg_mkdir_p(lmdb_page_directory,0777);
|
||||
mdb_env_create(&pageEnv);
|
||||
mdb_env_set_maxreaders(pageEnv,MAXREADERS);
|
||||
@ -174,11 +216,9 @@ InitPageDBEnv()
|
||||
mdb_txn_begin(pageEnv, NULL, 0, &pageTxn);
|
||||
mdb_dbi_open(pageTxn,NULL,MDB_CREATE,&pageDbi);
|
||||
mdb_txn_commit(pageTxn);
|
||||
// ereport(LOG, (errmsg("InitPageDBEnv is done")));
|
||||
}
|
||||
void InitWalDBEnv()
|
||||
{
|
||||
// ereport(LOG, (errmsg("InitWalDBEnv in processing")));
|
||||
pg_mkdir_p(lmdb_wal_directory,0777);
|
||||
mdb_env_create(&walEnv);
|
||||
mdb_env_set_maxreaders(walEnv,MAXREADERS);
|
||||
@ -187,22 +227,17 @@ void InitWalDBEnv()
|
||||
mdb_txn_begin(walEnv, NULL, 0, &walTxn);
|
||||
mdb_dbi_open(walTxn,NULL,MDB_CREATE|MDB_DUPSORT,&walDbi);
|
||||
mdb_txn_commit(walTxn);
|
||||
// ereport(LOG, (errmsg("InitWalDBEnv is done")));
|
||||
}
|
||||
void ClosePageDBEnv()
|
||||
{
|
||||
// ereport(LOG, (errmsg("ClosePageDBEnv in processing")));
|
||||
mdb_dbi_close(pageEnv, pageDbi);
|
||||
mdb_env_close(pageEnv);
|
||||
// ereport(LOG, (errmsg("ClosePageDBEnv in done")));
|
||||
}
|
||||
|
||||
void CloseWalEnv()
|
||||
{
|
||||
// ereport(LOG, (errmsg("CloseWalEnv in processing")));
|
||||
mdb_dbi_close(pageEnv, pageDbi);
|
||||
mdb_env_close(pageEnv);
|
||||
// ereport(LOG, (errmsg("CloseWalEnv in done")));
|
||||
}
|
||||
|
||||
static void
|
||||
@ -224,7 +259,6 @@ convertKeyLd(LdPageKey *ldkey, PageKey *pk)
|
||||
sdkey.blkno = pk->blkNo;
|
||||
|
||||
ldkey->sk =sdkey;
|
||||
// ldkey->pageLsn = pk->replyLsn;
|
||||
|
||||
}
|
||||
|
||||
@ -282,21 +316,17 @@ CleanUpSecondBuffer(const SdPageKey *pk)
|
||||
partitionLock = SecondBufferMappingPartitionLock(newHash);
|
||||
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
|
||||
bool found;
|
||||
// ereport(LOG, (errmsg("CleanUpSecondBuffer in processing")));
|
||||
hash_search(SecondBufferHash,
|
||||
(void *)pk,
|
||||
HASH_REMOVE,
|
||||
&found);
|
||||
LWLockRelease(partitionLock);
|
||||
// ereport(LOG, (errmsg("CleanUpSecondBuffer done")));
|
||||
return found;
|
||||
}
|
||||
|
||||
static SdPageValue *
|
||||
FindSecondBufferInTable(const SdPageKey *pk)
|
||||
{
|
||||
|
||||
// ereport(LOG, (errmsg("FindSecondBufferInTable in processing")));
|
||||
SdPageValue *pv;
|
||||
bool found;
|
||||
if (SecondBufferHash == NULL)
|
||||
@ -308,7 +338,6 @@ FindSecondBufferInTable(const SdPageKey *pk)
|
||||
pk,
|
||||
HASH_FIND,
|
||||
&found);
|
||||
// ereport(LOG, (errmsg("FindSecondBufferInTable done")));
|
||||
if (!found)
|
||||
{
|
||||
return NULL;
|
||||
@ -320,8 +349,6 @@ FindSecondBufferInTable(const SdPageKey *pk)
|
||||
void
|
||||
ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer)
|
||||
{
|
||||
// ereport(LOG, (errmsg("ReceivePageFromDataBuffer in processing")));
|
||||
// printf("ReceivePageFromDataBuffer in processing\n");
|
||||
SdPageKey *sk;
|
||||
sk = (SdPageKey *)malloc(sizeof(SdPageKey));
|
||||
convertKey(sk,pk);
|
||||
@ -341,7 +368,6 @@ ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer)
|
||||
|
||||
for (;;)
|
||||
{
|
||||
// printf("index is %d\n",index);
|
||||
sa = &MultiKeyArrays[index];
|
||||
SpinLockAcquire(&sa->oplock);
|
||||
if (sa->unused > 0)
|
||||
@ -672,8 +698,6 @@ storeWalInLocalBuffer(kvStruct *ks,int32 length)
|
||||
void
|
||||
GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
|
||||
{
|
||||
ereport(LOG, (errmsg("GetPageFromCurrentNode in processing")));
|
||||
|
||||
uint8_t *page;
|
||||
page = NULL;
|
||||
|
||||
@ -709,7 +733,6 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
|
||||
wlpk.pageLsn = pk.pageLsn;
|
||||
wlpk.partition = 0;
|
||||
Bufrd waldata = GetWalFromLocalBuffer(&pk);
|
||||
// memcpy(bufrd->buf,page,8192);
|
||||
if (waldata.count > 0)
|
||||
{
|
||||
bufrd->buf = (uint8_t *)realloc(bufrd->buf,8192 + waldata.count);
|
||||
@ -719,7 +742,6 @@ GetPageFromCurrentNode(PageKey pk,Bufrd *bufrd)
|
||||
free(waldata.buf);
|
||||
}
|
||||
}
|
||||
ereport(LOG, (errmsg("GetPageFromCurrentNode done")));
|
||||
}
|
||||
|
||||
|
||||
@ -843,7 +865,6 @@ RemovePageOrWalFromCurrentNode()
|
||||
static void
|
||||
MovePageFromSecondBufferToLocalBuffer()
|
||||
{
|
||||
// ereport(INFO,(errmsg("MovePageFromSecondBufferToLocalBuffer in processing")));
|
||||
MDB_txn *tmptxn;
|
||||
MDB_txn *txn = NULL;
|
||||
MDB_dbi dbi;
|
||||
@ -924,10 +945,8 @@ MovePageFromSecondBufferToLocalBuffer()
|
||||
|
||||
|
||||
newHash = SecondBufferHashCode(&spk);
|
||||
// printf("getpagefromsecondbuffer newhash is %d\n",newHash%NUM_LOCK_PARTITIONS);
|
||||
partitionLock = SecondBufferMappingPartitionLock(newHash);
|
||||
LWLockAcquire(partitionLock, LW_SHARED);
|
||||
// ereport(INFO,errmsg("mtol %d,%d,%d,%d\n",spk.dbid,spk.relid,spk.forkno,spk.blkno));
|
||||
spv = FindSecondBufferInTable(&spk);
|
||||
if (spv == NULL || spv->pagecontent == NULL)
|
||||
{
|
||||
@ -967,10 +986,6 @@ MovePageFromSecondBufferToLocalBuffer()
|
||||
{
|
||||
CleanUpSecondBuffer(&s->spk);
|
||||
}
|
||||
|
||||
|
||||
// ereport(INFO,errmsg("CleanUpSecondBuffer is ok"));
|
||||
|
||||
if (spkl.head->next != NULL)
|
||||
{
|
||||
spkl.head = spkl.head->next;
|
||||
@ -981,7 +996,6 @@ MovePageFromSecondBufferToLocalBuffer()
|
||||
free(spkl.head);
|
||||
break;
|
||||
}
|
||||
// ereport(INFO,errmsg("double free"));
|
||||
}
|
||||
|
||||
spkl.head = spkl.tail = NULL;
|
||||
@ -1012,7 +1026,6 @@ MovePageFromSecondBufferToLocalBuffer()
|
||||
if (rate < 0.1)
|
||||
{
|
||||
sleeptime = 0;
|
||||
printf("RED ALERT: the rate of totalunused is low: %lf\n",rate);
|
||||
}
|
||||
else if (rate > 0.6)
|
||||
{
|
||||
@ -1026,7 +1039,6 @@ MovePageFromSecondBufferToLocalBuffer()
|
||||
|
||||
|
||||
}
|
||||
// ereport(INFO, (errmsg("MovePageFromSecondBufferToLocalBuffer done")));
|
||||
}
|
||||
|
||||
void SignalStartSecondBuffer(void) {
|
||||
@ -1062,21 +1074,6 @@ SecondBufferMain(void)
|
||||
|
||||
|
||||
MovePageFromSecondBufferToLocalBuffer();
|
||||
// for(;;)
|
||||
// {
|
||||
// pg_usleep(10000000L);
|
||||
// }
|
||||
// pthread_t sdbToLb;
|
||||
// pthread_t cleanupLb;
|
||||
|
||||
|
||||
// pthread_create(&sdbToLb,NULL,MovePageFromSecondBufferToLocalBuffer,NULL);
|
||||
// pthread_create(&cleanupLb,NULL,RemovePageOrWalFromCurrentNode,NULL);
|
||||
// pthread_join(sdbToLb,NULL);
|
||||
// pthread_join(cleanupLb,NULL);
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -97,7 +97,7 @@
|
||||
#include "pgstat.h"
|
||||
#include "port/pg_iovec.h"
|
||||
#include "portability/mem.h"
|
||||
#include "postmaster/seconfbuffer.h"
|
||||
#include "postmaster/secondbuffer.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/spin.h"
|
||||
|
@ -29,7 +29,7 @@
|
||||
#include "postmaster/bgworker_internals.h"
|
||||
#include "postmaster/bgwriter.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "postmaster/seconfbuffer.h"
|
||||
#include "postmaster/secondbuffer.h"
|
||||
#include "replication/logicallauncher.h"
|
||||
#include "replication/origin.h"
|
||||
#include "replication/slot.h"
|
||||
|
@ -33,7 +33,7 @@
|
||||
#include "pg_trace.h"
|
||||
#include "pgstat.h"
|
||||
#include "postmaster/bgwriter.h"
|
||||
#include "postmaster/seconfbuffer.h"
|
||||
#include "postmaster/secondbuffer.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/fd.h"
|
||||
@ -739,7 +739,6 @@ void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
char **buffer, XLogRecPtr lsn)
|
||||
{
|
||||
ereport(LOG,errmsg("he3db_mdread_pagexlog int processing"));
|
||||
off_t seekpos;
|
||||
int nbytes;
|
||||
MdfdVec *v;
|
||||
@ -779,8 +778,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
|
||||
|
||||
odpk.pk = pageKey;
|
||||
odpk.opration = (int)EVICT;
|
||||
// ereport(LOG,errmsg("he3db_mdread_pagexlog done"));
|
||||
// bufrd = MoveOnePageToMemory(pageKey);
|
||||
|
||||
GetPageFromCurrentNode(pageKey, bufrd);
|
||||
count = bufrd->count;
|
||||
}
|
||||
@ -790,7 +788,6 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
|
||||
*buffer = bufrd->buf;
|
||||
free(bufrd);
|
||||
AddOneItemToDPArray(odpk);
|
||||
ereport(LOG,errmsg("1,he3db_mdread_pagexlog int done"));
|
||||
return count;
|
||||
}
|
||||
else
|
||||
@ -823,7 +820,6 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
|
||||
|
||||
if (push_standby)
|
||||
{
|
||||
ereport(LOG,errmsg("he3db_mdread_pagexlog int done, push_standby"));
|
||||
return nbytes;
|
||||
}
|
||||
|
||||
@ -863,7 +859,6 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
|
||||
ReceivePageFromDataBuffer(&pageKey, *buffer);
|
||||
}
|
||||
FreeLsnNode(head);
|
||||
ereport(LOG,errmsg("he3db_mdread_pagexlog int done, push"));
|
||||
return nbytes;
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@
|
||||
#include "storage/md.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "storage/filecache.h"
|
||||
#include "postmaster/seconfbuffer.h"
|
||||
#include "postmaster/secondbuffer.h"
|
||||
//#include "utils/hfs.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/inval.h"
|
||||
|
@ -74,7 +74,7 @@
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "postmaster/syslogger.h"
|
||||
#include "postmaster/walwriter.h"
|
||||
#include "postmaster/seconfbuffer.h"
|
||||
#include "postmaster/secondbuffer.h"
|
||||
#include "replication/logicallauncher.h"
|
||||
#include "replication/reorderbuffer.h"
|
||||
#include "replication/slot.h"
|
||||
|
@ -23,6 +23,7 @@ SUBDIRS = \
|
||||
pg_controldata \
|
||||
pg_ctl \
|
||||
pg_dump \
|
||||
pg_waldump \
|
||||
pg_resetwal \
|
||||
pg_test_fsync \
|
||||
pg_test_timing \
|
||||
|
@ -1,25 +1,8 @@
|
||||
#include "postgres.h"
|
||||
#include <lmdb.h>
|
||||
#include <stdbool.h>
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "libpq/pqsignal.h"
|
||||
#include "storage/s_lock.h"
|
||||
#include "storage/spin.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "storage/bufpage.h"
|
||||
#include "storage/pmsignal.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "storage/procsignal.h"
|
||||
#include "utils/dynahash.h"
|
||||
#include "utils/hfs.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
//#include <c.h>
|
||||
#include <lmdb.h>
|
||||
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
#include <time.h>
|
||||
|
||||
#define MAXREADERS 512
|
||||
#define MAPSIE (uint64)1<<40
|
||||
@ -69,7 +52,6 @@ typedef struct SdPageKeyList
|
||||
typedef struct LdPageKey
|
||||
{
|
||||
SdPageKey sk;
|
||||
// uint64 pageLsn;
|
||||
} LdPageKey;
|
||||
|
||||
typedef struct WalLdPageKey
|
||||
@ -91,20 +73,7 @@ typedef struct SdPageValue
|
||||
uint8 pagecontent[BLKSZ];
|
||||
} SdPageValue;
|
||||
|
||||
typedef struct SingleKeyArray
|
||||
{
|
||||
SdPageKey SdPageKeyList[SDLEN];
|
||||
uint16 head;
|
||||
uint16 tail;
|
||||
uint16 unused;
|
||||
slock_t oplock;
|
||||
} SingleKeyArray;
|
||||
|
||||
typedef struct Statisticnum
|
||||
{
|
||||
double totalunused;
|
||||
slock_t change;
|
||||
} Statisticnum;
|
||||
|
||||
typedef struct DPageKey
|
||||
{
|
||||
@ -113,16 +82,7 @@ typedef struct DPageKey
|
||||
uint8_t operation;
|
||||
} DPageKey;
|
||||
|
||||
typedef struct DPageKeyArray
|
||||
{
|
||||
DPageKey dpk[1024];
|
||||
uint16 unused;
|
||||
uint16 head;
|
||||
uint16 tail;
|
||||
uint16 pageIndex;
|
||||
uint16 walIndex;
|
||||
slock_t append;
|
||||
} DPageKeyArray;
|
||||
|
||||
|
||||
typedef struct kvStruct {
|
||||
LdPageKey lpk;
|
||||
@ -131,7 +91,7 @@ typedef struct kvStruct {
|
||||
uint64_t lsn;
|
||||
} kvStruct;
|
||||
|
||||
extern SingleKeyArray *MultiKeyArrays;
|
||||
//extern SingleKeyArray *MultiKeyArrays;
|
||||
|
||||
extern MDB_env *pageEnv;
|
||||
extern MDB_env *walEnv;
|
@ -67,6 +67,7 @@ extern void free_dataRead(uint8_t *buf, size_t count, size_t cap);
|
||||
|
||||
extern Bufrd readfs(int64_t fd, int64_t offset, uint32_t size);
|
||||
extern int batchRead(uint8_t *buf, uint32_t timeline, uint64_t startPtr,uint64_t endPtr, bool needStore);
|
||||
extern int batchReadForTools(uint8_t *buf, uint32_t timeline, uint64_t startPtr,uint64_t endPtr, bool needStore);
|
||||
extern uint8_t kvwrite(XLogItem *xlogItem);
|
||||
extern uint8_t flushwals(XLogItem *xlogItem, uint32_t timeline);
|
||||
extern uint8_t kvflush(XLogRecPtr lsn);
|
||||
|
Loading…
Reference in New Issue
Block a user