mirror of
https://gitee.com/he3db/he3pg.git
synced 2024-12-01 19:58:06 +08:00
Merge remote-tracking branch 'upstream/dev_performance' into dev_performance
This commit is contained in:
commit
2a77080888
@ -93,7 +93,7 @@ max_connections = 100 # (change requires restart)
|
||||
# - Authentication -
|
||||
|
||||
#authentication_timeout = 1min # 1s-600s
|
||||
#password_encryption = scram-sha-256 # scram-sha-256 or md5
|
||||
|
||||
#db_user_namespace = off
|
||||
|
||||
# GSSAPI using Kerberos
|
||||
|
@ -93,7 +93,7 @@ max_connections = 100 # (change requires restart)
|
||||
# - Authentication -
|
||||
|
||||
#authentication_timeout = 1min # 1s-600s
|
||||
password_encryption = md5 # scram-sha-256 or md5
|
||||
|
||||
#db_user_namespace = off
|
||||
|
||||
# GSSAPI using Kerberos
|
||||
|
@ -91,6 +91,7 @@
|
||||
#include "access/xlog.h"
|
||||
#include "miscadmin.h"
|
||||
#include "port/pg_bitutils.h"
|
||||
#include "postmaster/secondbuffer.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/smgr.h"
|
||||
@ -656,6 +657,18 @@ vm_extend(Relation rel, BlockNumber vm_nblocks)
|
||||
|
||||
smgrextend(rel->rd_smgr, VISIBILITYMAP_FORKNUM, vm_nblocks_now,
|
||||
pg.data, false);
|
||||
if (!(InitdbSingle || IsBootstrapProcessingMode() == true) && !push_standby && !he3mirror)
|
||||
{
|
||||
PageKey pageKey;
|
||||
pageKey.relfileNode.dbNode = rel->rd_smgr->smgr_rnode.node.dbNode;
|
||||
pageKey.relfileNode.relNode = rel->rd_smgr->smgr_rnode.node.relNode;
|
||||
|
||||
pageKey.blkNo = vm_nblocks_now;
|
||||
pageKey.forkNo = VISIBILITYMAP_FORKNUM;
|
||||
pageKey.pageLsn = 0;
|
||||
|
||||
ReceivePageFromDataBuffer(&pageKey, (uint8_t *) pg.data);
|
||||
}
|
||||
vm_nblocks_now++;
|
||||
}
|
||||
|
||||
|
@ -470,24 +470,29 @@ static HTAB *PageCountHash = NULL;
|
||||
static uint32_t curLatchPos = 0;
|
||||
|
||||
typedef struct {
|
||||
pg_atomic_uint32 gpushpos;
|
||||
pg_atomic_uint32 ready;
|
||||
pg_atomic_uint32 gpos;
|
||||
slock_t mutex;
|
||||
volatile uint32 gpushpos;
|
||||
volatile bool ready;
|
||||
volatile uint32 gpos;
|
||||
pg_atomic_uint32 latchPos;
|
||||
pg_atomic_uint32 taskNum;
|
||||
pg_atomic_uint32 modifyNum;
|
||||
uint32 modifyNum;
|
||||
Latch pageFlushWakeupLatch[PARALLEL_NUM];
|
||||
PageValue*gtag[G_QUEUE_LEN];
|
||||
}PageHashQueueShmemStruct;
|
||||
static PageHashQueueShmemStruct *PageHashQueueShmem;
|
||||
|
||||
void pushSlaveReplayQueue(int pageNum) {
|
||||
if (pg_atomic_read_u32(&PageHashQueueShmem->gpos) != 0 && pg_atomic_read_u32(&PageHashQueueShmem->ready) == 0) {
|
||||
pg_atomic_exchange_u32(&PageHashQueueShmem->ready,1);
|
||||
|
||||
if (PageHashQueueShmem->gpos != 0 && PageHashQueueShmem->ready == false) {
|
||||
SpinLockAcquire(&PageHashQueueShmem->mutex);
|
||||
PageHashQueueShmem->ready = true;
|
||||
SpinLockRelease(&PageHashQueueShmem->mutex);
|
||||
WakeupFlushWork();
|
||||
}
|
||||
}
|
||||
|
||||
while(pageNum > CompletedTaskNum()) {
|
||||
usleep(10);
|
||||
pg_usleep(1000L);
|
||||
}
|
||||
cleanMap();
|
||||
}
|
||||
@ -547,12 +552,15 @@ PageHashQueueShmemInit(void)
|
||||
|
||||
if (!found)
|
||||
{
|
||||
pg_atomic_init_u32(&PageHashQueueShmem->ready,0);
|
||||
pg_atomic_init_u32(&PageHashQueueShmem->gpushpos,0);
|
||||
pg_atomic_init_u32(&PageHashQueueShmem->latchPos,0);
|
||||
SpinLockInit(&PageHashQueueShmem->mutex);
|
||||
SpinLockAcquire(&PageHashQueueShmem->mutex);
|
||||
PageHashQueueShmem->ready = false;
|
||||
PageHashQueueShmem->gpushpos = 0;
|
||||
SpinLockRelease(&PageHashQueueShmem->mutex);
|
||||
PageHashQueueShmem->gpos = 0;
|
||||
pg_atomic_init_u32(&PageHashQueueShmem->taskNum,0);
|
||||
pg_atomic_init_u32(&PageHashQueueShmem->gpos, 0);
|
||||
pg_atomic_init_u32(&PageHashQueueShmem->modifyNum,0);
|
||||
pg_atomic_init_u32(&PageHashQueueShmem->latchPos, 0);
|
||||
PageHashQueueShmem->modifyNum = 0;
|
||||
for (int i = 0;i<PARALLEL_NUM;i++) {
|
||||
InitSharedLatch(&PageHashQueueShmem->pageFlushWakeupLatch[i]);
|
||||
}
|
||||
@ -567,7 +575,6 @@ Size PageHashMapSize(void) {
|
||||
return RedoStartPointSize() + hash_estimate_size(G_QUEUE_LEN,sizeof(PageValue));
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
InitBufferPoolHashMap(void)
|
||||
{
|
||||
@ -605,11 +612,12 @@ uint32_t addFileKey(BufferTag*onePage) {
|
||||
&found);
|
||||
if (found == false) {
|
||||
result->num = 0;
|
||||
uint32_t gpos = pg_atomic_fetch_add_u32(&PageHashQueueShmem->gpos,1);
|
||||
uint32_t gpos = PageHashQueueShmem->gpos++;
|
||||
PageHashQueueShmem->gtag[gpos] = result;
|
||||
}
|
||||
result->num++;
|
||||
return pg_atomic_fetch_add_u32(&PageHashQueueShmem->modifyNum,1);
|
||||
PageHashQueueShmem->modifyNum++;
|
||||
return PageHashQueueShmem->modifyNum;
|
||||
}
|
||||
|
||||
void cleanMap(void) {
|
||||
@ -624,11 +632,13 @@ void cleanMap(void) {
|
||||
HASH_REMOVE, NULL) == NULL)
|
||||
elog(ERROR, "hash table corrupted");
|
||||
}
|
||||
pg_atomic_init_u32(&PageHashQueueShmem->ready,0);
|
||||
pg_atomic_init_u32(&PageHashQueueShmem->gpos, 0);
|
||||
pg_atomic_init_u32(&PageHashQueueShmem->gpushpos,0);
|
||||
SpinLockAcquire(&PageHashQueueShmem->mutex);
|
||||
PageHashQueueShmem->ready = false;
|
||||
PageHashQueueShmem->gpushpos = 0;
|
||||
SpinLockRelease(&PageHashQueueShmem->mutex);
|
||||
PageHashQueueShmem->gpos = 0;
|
||||
pg_atomic_init_u32(&PageHashQueueShmem->taskNum,0);
|
||||
pg_atomic_init_u32(&PageHashQueueShmem->modifyNum,0);
|
||||
PageHashQueueShmem->modifyNum = 0;
|
||||
}
|
||||
|
||||
uint32_t hashMapSize(void) {
|
||||
@ -640,34 +650,31 @@ static int cmp(const void* a,const void* b) {
|
||||
}
|
||||
|
||||
void SortPageQueue(void) {
|
||||
if (pg_atomic_read_u32(&PageHashQueueShmem->gpos) != 0 && pg_atomic_read_u32(&PageHashQueueShmem->ready) == 0) {
|
||||
qsort(PageHashQueueShmem->gtag,pg_atomic_read_u32(&PageHashQueueShmem->gpos),sizeof(PageValue*),cmp);
|
||||
pg_atomic_exchange_u32(&PageHashQueueShmem->ready,1);
|
||||
if (PageHashQueueShmem->gpos != 0 && PageHashQueueShmem->ready == false) {
|
||||
qsort(PageHashQueueShmem->gtag,PageHashQueueShmem->gpos,sizeof(PageValue*),cmp);
|
||||
SpinLockAcquire(&PageHashQueueShmem->mutex);
|
||||
PageHashQueueShmem->ready = true;
|
||||
SpinLockRelease(&PageHashQueueShmem->mutex);
|
||||
WakeupFlushWork();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
BufferTag* QueuePushPage(void) {
|
||||
uint32_t gpushpos = pg_atomic_read_u32(&PageHashQueueShmem->gpushpos);
|
||||
uint32_t gpos = pg_atomic_read_u32(&PageHashQueueShmem->gpos);
|
||||
uint32_t ready = pg_atomic_read_u32(&PageHashQueueShmem->ready);
|
||||
if (ready == 0 ||
|
||||
gpushpos >= gpos){
|
||||
return NULL;
|
||||
}
|
||||
gpushpos = pg_atomic_fetch_add_u32(&PageHashQueueShmem->gpushpos,1);
|
||||
ready = pg_atomic_read_u32(&PageHashQueueShmem->ready);
|
||||
/* only ready 1 to 0 then 0 to 1 will produce a serious problem,
|
||||
reason functions of pushTikv trigger the next batch of data processing
|
||||
cost time less than (pg_atomic_read_u32 and pg_atomic_fetch_add_u32) */
|
||||
if (ready!= 0 && gpushpos < gpos) {
|
||||
return &(PageHashQueueShmem->gtag[gpushpos]->tag);
|
||||
} else {
|
||||
if (gpushpos < gpos) {
|
||||
elog(ERROR,"QueuePushPage gpushpos %d < gpos %d",gpushpos,gpos);
|
||||
uint32_t gpushpos;
|
||||
bool hasData = false;
|
||||
if (PageHashQueueShmem->ready == true) {
|
||||
SpinLockAcquire(&PageHashQueueShmem->mutex);
|
||||
if (PageHashQueueShmem->ready == true && PageHashQueueShmem->gpushpos < PageHashQueueShmem->gpos) {
|
||||
hasData = true;
|
||||
gpushpos = PageHashQueueShmem->gpushpos++;
|
||||
}
|
||||
SpinLockRelease(&PageHashQueueShmem->mutex);
|
||||
}
|
||||
if (hasData == false) {
|
||||
return NULL;
|
||||
} else {
|
||||
return &(PageHashQueueShmem->gtag[gpushpos]->tag);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -673,8 +673,8 @@ typedef struct FlushInfo
|
||||
|
||||
typedef struct XLogParralFlush
|
||||
{
|
||||
pg_atomic_uint32 begin;
|
||||
uint32 last;
|
||||
pg_atomic_uint64 begin;
|
||||
uint64 last;
|
||||
// uint32 count;
|
||||
// uint32 diff;
|
||||
// FlushInfo wrtResult[128];
|
||||
@ -3459,7 +3459,7 @@ FlushWal(XLogwrtRqst WriteRqst)
|
||||
uint64 count;
|
||||
// int stp;
|
||||
int xlogLength;
|
||||
uint32 curLoc = 0;
|
||||
uint64 curLoc = 0;
|
||||
bool mustDo = false;
|
||||
bool nowrite = false;
|
||||
|
||||
@ -3604,7 +3604,7 @@ mustflush:
|
||||
// SpinLockRelease(&XLogCtl->info_lck);
|
||||
|
||||
// printf("end flush wals, begin %d, curLoc %d, WriteRqst.Write %ld\n", flushInfo.begin, curLoc, WriteRqst.Write);
|
||||
while (pg_atomic_read_u32(&XLogCtl->LogFlush.begin) < curLoc)
|
||||
while (pg_atomic_read_u64(&XLogCtl->LogFlush.begin) < curLoc)
|
||||
{
|
||||
pg_usleep(20L);
|
||||
|
||||
@ -3636,7 +3636,7 @@ mustflush:
|
||||
} else
|
||||
XLogCtl->LogFlush.diff = diff;
|
||||
*/
|
||||
pg_atomic_write_u32(&XLogCtl->LogFlush.begin, curLoc+1);
|
||||
pg_atomic_write_u64(&XLogCtl->LogFlush.begin, curLoc+1);
|
||||
// SpinLockAcquire(&XLogCtl->info_lck);
|
||||
|
||||
// XLogCtl->LogFlush.begin = curLoc+1;
|
||||
@ -5841,7 +5841,9 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
|
||||
else
|
||||
{
|
||||
/* No valid record available from this source */
|
||||
lastSourceFailed = true;
|
||||
if (xlogreader->streamStart != true) {
|
||||
lastSourceFailed = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* If archive recovery was requested, but we were still doing
|
||||
@ -7991,30 +7993,24 @@ void pushTikv(int onePageListLen,int pageNum,bool flag)
|
||||
SortPageQueue();
|
||||
//wait shared queue data handler
|
||||
while(pageNum > CompletedTaskNum()) {
|
||||
usleep(10);
|
||||
pg_usleep(1000L);
|
||||
}
|
||||
cleanMap();
|
||||
start = end;
|
||||
PushPtr = GetXLogReplayRecPtr(NULL);
|
||||
PrePushPtr = PushPtr;
|
||||
if (ApplyLsn < PushPtr) {
|
||||
ApplyLsn = PushPtr;
|
||||
if (push_standby == true && XLogCtl->pushToDisk != PushPtr && !mpush) {
|
||||
InsertConsistToKV(PushPtr);
|
||||
}
|
||||
if (push_standby == true && XLogCtl->pushToDisk != ApplyLsn && !mpush) {
|
||||
InsertConsistToKV(ApplyLsn);
|
||||
}
|
||||
XLogCtl->pushToDisk = ApplyLsn;
|
||||
XLogCtl->pushToDisk = PushPtr;
|
||||
} else {
|
||||
if (flag == true) {
|
||||
PushPtr = GetXLogReplayRecPtr(NULL);
|
||||
PrePushPtr = PushPtr;
|
||||
if (ApplyLsn < PushPtr) {
|
||||
ApplyLsn = PushPtr;
|
||||
if (push_standby == true && XLogCtl->pushToDisk != PushPtr && !mpush) {
|
||||
InsertConsistToKV(PushPtr);
|
||||
}
|
||||
if (push_standby == true && XLogCtl->pushToDisk != ApplyLsn && !mpush) {
|
||||
InsertConsistToKV(ApplyLsn);
|
||||
}
|
||||
XLogCtl->pushToDisk = ApplyLsn;
|
||||
XLogCtl->pushToDisk = PushPtr;
|
||||
}
|
||||
}
|
||||
|
||||
@ -14734,6 +14730,8 @@ producerXLogParallelBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr,
|
||||
{
|
||||
readLen = 0;
|
||||
readSource = XLOG_FROM_ANY;
|
||||
} else {
|
||||
failedCount = 0;
|
||||
}
|
||||
|
||||
XLogRecPtr maxFlushedUpto = 0;
|
||||
@ -14973,7 +14971,10 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
|
||||
|
||||
wait_time = wal_retrieve_retry_interval -
|
||||
TimestampDifferenceMilliseconds(last_fail_time, now);
|
||||
|
||||
//this for mutipthread use WaitLatch=>WaitEventSetWait=>global variables LatchWaitSet
|
||||
while(!ring_buffer_is_empty(gRingBufferManger) && !IsFreePthreadPool() && !startup_shutdown_requested) {
|
||||
pg_usleep(1000L);
|
||||
}
|
||||
(void)WaitLatch(&XLogCtl->recoveryWakeupLatch,
|
||||
WL_LATCH_SET | WL_TIMEOUT |
|
||||
WL_EXIT_ON_PM_DEATH,
|
||||
@ -15266,6 +15267,10 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
|
||||
* to react to a trigger file promptly and to check if the
|
||||
* WAL receiver is still active.
|
||||
*/
|
||||
//this for mutipthread use WaitLatch=>WaitEventSetWait=>global variables LatchWaitSet
|
||||
while(!ring_buffer_is_empty(gRingBufferManger) && !IsFreePthreadPool() && !startup_shutdown_requested) {
|
||||
pg_usleep(1000L);
|
||||
}
|
||||
(void)WaitLatch(&XLogCtl->recoveryWakeupLatch,
|
||||
WL_LATCH_SET | WL_TIMEOUT |
|
||||
WL_EXIT_ON_PM_DEATH,
|
||||
@ -15777,3 +15782,16 @@ static void PrecacheHotDataByRules()
|
||||
PQclear(ruleRes);
|
||||
PQfinish(metaConn);
|
||||
}
|
||||
|
||||
void He3DBGetWalWriteStats(XLogRecPtr *writtenlsn, XLogRecPtr *flushlsn, uint64 *totaltimes, int *parallels)
|
||||
{
|
||||
|
||||
SpinLockAcquire(&XLogCtl->info_lck);
|
||||
LogwrtResult = XLogCtl->LogwrtResult;
|
||||
XLogParralFlush flushinfo = XLogCtl->LogFlush;
|
||||
SpinLockRelease(&XLogCtl->info_lck);
|
||||
*writtenlsn = LogwrtResult.Write;
|
||||
*flushlsn = (XLogRecPtr) pg_atomic_read_u64(&LogwrtResult.Flush);
|
||||
*totaltimes = flushinfo.last;
|
||||
*parallels = flushinfo.last - pg_atomic_read_u64(&flushinfo.begin);
|
||||
}
|
@ -764,13 +764,14 @@ restart:
|
||||
}
|
||||
//need to tell push standby has new standby add
|
||||
while(ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) {
|
||||
XLogRecPtr tmpLsn = InvalidXLogRecPtr;
|
||||
XLogRecPtr tmpLsn = InvalidXLogRecPtr;
|
||||
if (!he3mirror) {
|
||||
tmpLsn = QueryMinLsn(InvalidXLogRecPtr);
|
||||
}
|
||||
if (tmpLsn !=InvalidXLogRecPtr) {
|
||||
pushTikv(0,hashMapSize(),true);
|
||||
if (tmpLsn <= RecPtr) {
|
||||
pg_usleep(200000);
|
||||
pg_usleep(10000);
|
||||
continue;
|
||||
} else {
|
||||
ApplyLsn = tmpLsn;
|
||||
@ -1126,11 +1127,12 @@ restart:
|
||||
}
|
||||
//need to tell push standby has new standby add
|
||||
while(ApplyLsn != InvalidXLogRecPtr && RecPtr >= ApplyLsn) {
|
||||
XLogRecPtr tmpLsn = InvalidXLogRecPtr;
|
||||
XLogRecPtr tmpLsn = InvalidXLogRecPtr;
|
||||
if (!he3mirror) {
|
||||
tmpLsn = QueryMinLsn(InvalidXLogRecPtr);
|
||||
}
|
||||
if (tmpLsn !=InvalidXLogRecPtr) {
|
||||
pushTikv(0,hashMapSize(),true);
|
||||
if (tmpLsn <= RecPtr) {
|
||||
pg_usleep(10000);
|
||||
continue;
|
||||
|
@ -866,6 +866,24 @@ CREATE VIEW pg_stat_replication AS
|
||||
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
|
||||
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
|
||||
|
||||
CREATE VIEW pg_stat_he3walwrite AS
|
||||
SELECT
|
||||
s.write_lsn,
|
||||
s.flush_lsn,
|
||||
s.writekv_totaltimes,
|
||||
s.writekv_parallels
|
||||
FROM pg_stat_get_he3walwrite() AS s
|
||||
;
|
||||
|
||||
CREATE VIEW pg_stat_he3_logindex AS
|
||||
SELECT
|
||||
s.memtable_total,
|
||||
s.memtable_used,
|
||||
s.memtable_start_index,
|
||||
s.memtable_active_index,
|
||||
s.page_total
|
||||
FROM pg_stat_get_he3_logindex() AS s;
|
||||
|
||||
CREATE VIEW pg_stat_slru AS
|
||||
SELECT
|
||||
s.name,
|
||||
|
@ -336,17 +336,17 @@ static bool
|
||||
CleanUpSecondBuffer(const SdPageKey *pk)
|
||||
{
|
||||
|
||||
LWLock *partitionLock;
|
||||
uint32 newHash;
|
||||
newHash = SecondBufferHashCode(pk);
|
||||
partitionLock = SecondBufferMappingPartitionLock(newHash);
|
||||
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
|
||||
// LWLock *partitionLock;
|
||||
// uint32 newHash;
|
||||
// newHash = SecondBufferHashCode(pk);
|
||||
// partitionLock = SecondBufferMappingPartitionLock(newHash);
|
||||
// LWLockAcquire(partitionLock, LW_EXCLUSIVE);
|
||||
bool found;
|
||||
hash_search(SecondBufferHash,
|
||||
(void *)pk,
|
||||
HASH_REMOVE,
|
||||
&found);
|
||||
LWLockRelease(partitionLock);
|
||||
// LWLockRelease(partitionLock);
|
||||
return found;
|
||||
}
|
||||
|
||||
@ -393,6 +393,7 @@ ReceivePageFromDataBuffer(PageKey *pk, uint8_t *buffer)
|
||||
LWLockRelease(partitionLock);
|
||||
continue;
|
||||
}
|
||||
sdPageValue->canDelete = false;
|
||||
memcpy(sdPageValue->pagecontent, buffer,8192);
|
||||
LWLockRelease(partitionLock);
|
||||
}
|
||||
@ -1300,7 +1301,12 @@ RemovePageOrWalFromCurrentNode()
|
||||
mdb_cursor_del(tmpcursor,0);
|
||||
if (PageOrWal == (int)PAGE)
|
||||
{
|
||||
// CleanUpSecondBuffer(&((SdPageValue *)data.mv_data)->pk);
|
||||
newHash = SecondBufferHashCode(&((SdPageValue *)data.mv_data)->pk);
|
||||
partitionLock = SecondBufferMappingPartitionLock(newHash);
|
||||
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
|
||||
CleanUpSecondBuffer(&((SdPageValue *)data.mv_data)->pk);
|
||||
LWLockRelease(partitionLock);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1309,7 +1315,12 @@ RemovePageOrWalFromCurrentNode()
|
||||
//TRUNCATE
|
||||
if (DPArray->dpk[DPArray->walIndex].operation == (int)TRUNCATE)
|
||||
{
|
||||
// CleanUpSecondBuffer(&((SdPageValue *)data.mv_data)->pk);
|
||||
newHash = SecondBufferHashCode(&((SdPageValue *)data.mv_data)->pk);
|
||||
partitionLock = SecondBufferMappingPartitionLock(newHash);
|
||||
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
|
||||
CleanUpSecondBuffer(&((SdPageValue *)data.mv_data)->pk);
|
||||
LWLockRelease(partitionLock);
|
||||
}
|
||||
else if (NULL == FindSecondBufferInTable(&lpk->sk)) //EVICT
|
||||
{
|
||||
@ -1476,6 +1487,7 @@ MovePageFromSecondBufferToLocalBuffer()
|
||||
data.mv_data = spv->pagecontent;
|
||||
|
||||
mp = mdb_put(tmptxn,pageDbi,&key,&data, 0);
|
||||
spv->canDelete = true;
|
||||
LWLockRelease(partitionLock);
|
||||
if (mp != 0)
|
||||
{
|
||||
@ -1511,7 +1523,12 @@ MovePageFromSecondBufferToLocalBuffer()
|
||||
spv = FindSecondBufferInTable(&s->spk);
|
||||
if (spv != NULL)
|
||||
{
|
||||
CleanUpSecondBuffer(&s->spk);
|
||||
newHash = SecondBufferHashCode(&s->spk);
|
||||
partitionLock = SecondBufferMappingPartitionLock(newHash);
|
||||
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
|
||||
if (spv->canDelete)
|
||||
CleanUpSecondBuffer(&s->spk);
|
||||
LWLockRelease(partitionLock);
|
||||
}
|
||||
}
|
||||
if (spkl.head->next != NULL)
|
||||
|
@ -1943,8 +1943,7 @@ retry:
|
||||
/* safety check: should definitely not be our *own* pin */
|
||||
if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0)
|
||||
elog(ERROR, "buffer is pinned in InvalidateBuffer");
|
||||
// WaitIO(buf);
|
||||
pg_usleep(1000L);
|
||||
WaitIO(buf);
|
||||
goto retry;
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,7 @@
|
||||
#include "storage/fsm_internals.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "postmaster/secondbuffer.h"
|
||||
|
||||
|
||||
/*
|
||||
@ -641,6 +642,18 @@ fsm_extend(Relation rel, BlockNumber fsm_nblocks)
|
||||
|
||||
smgrextend(rel->rd_smgr, FSM_FORKNUM, fsm_nblocks_now,
|
||||
pg.data, false);
|
||||
if (!(InitdbSingle || IsBootstrapProcessingMode() == true) && !push_standby && !he3mirror)
|
||||
{
|
||||
PageKey pageKey;
|
||||
pageKey.relfileNode.dbNode = rel->rd_smgr->smgr_rnode.node.dbNode;
|
||||
pageKey.relfileNode.relNode = rel->rd_smgr->smgr_rnode.node.relNode;
|
||||
|
||||
pageKey.blkNo = fsm_nblocks_now;
|
||||
pageKey.forkNo = FSM_FORKNUM;
|
||||
pageKey.pageLsn = 0;
|
||||
|
||||
ReceivePageFromDataBuffer(&pageKey, (uint8_t *) pg.data);
|
||||
}
|
||||
fsm_nblocks_now++;
|
||||
}
|
||||
|
||||
|
@ -23,7 +23,7 @@ LogIndexMemListSize(uint64 he3db_logindex_mem_size)
|
||||
if (logindex_mem_tbl_size < 3)
|
||||
elog(FATAL, "The number=%ld of logindex memory table is less than 3", logindex_mem_tbl_size);
|
||||
else
|
||||
ereport(LOG, (errmsg("The total log index memory table size is %ld", size)));
|
||||
ereport(LOG, (errmsg("The total log index memory table size is %ld, number logindex mem-table size is %ld", size, logindex_mem_tbl_size)));
|
||||
|
||||
return size;
|
||||
}
|
||||
@ -42,20 +42,21 @@ static void SetNewPageItem(LogIndexMemTBL *mem_tbl, const BufferTag *page)
|
||||
static LogIndexMemTBL *GetNextFreeMemTbl(void)
|
||||
{
|
||||
// TODO change to Lightweight Lock
|
||||
LWLockAcquire(LogIndexMemListLock,LW_EXCLUSIVE);
|
||||
// Circular List
|
||||
log_index_mem_list->active_table_index = (log_index_mem_list->active_table_index + 1)%(log_index_mem_list->table_cap);
|
||||
LWLockRelease(LogIndexMemListLock);
|
||||
uint64 active_tbl_index = (log_index_mem_list->active_table_index + 1)%(log_index_mem_list->table_cap);
|
||||
// if all mem table is full, waiting for recycle
|
||||
if(log_index_mem_list->active_table_index == log_index_mem_list->table_start_index)
|
||||
if(active_tbl_index == log_index_mem_list->table_start_index)
|
||||
{
|
||||
elog(LOG, "Mem table is full, waiting for cleanup. Total size: %ld", logindex_mem_tbl_size);
|
||||
}
|
||||
while(log_index_mem_list->active_table_index == log_index_mem_list->table_start_index)
|
||||
while(active_tbl_index == log_index_mem_list->table_start_index)
|
||||
{
|
||||
pg_usleep(10); /* 10 us */
|
||||
}
|
||||
elog(DEBUG5, "Find next free mem table");
|
||||
elog(DEBUG5, "Find next free mem table and set active_table_index + 1: %ld", active_tbl_index);
|
||||
LWLockAcquire(LogIndexMemListLock,LW_EXCLUSIVE);
|
||||
// Circular List
|
||||
log_index_mem_list->active_table_index = active_tbl_index;
|
||||
LWLockRelease(LogIndexMemListLock);
|
||||
// if it finds free mem table will return directly.
|
||||
return &(log_index_mem_list->mem_table[log_index_mem_list->active_table_index]);
|
||||
}
|
||||
@ -827,3 +828,30 @@ void FreeTagNode(TagNode *head)
|
||||
tn = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void He3DBGetLogindexStats(uint64 *memtable_total, uint64 *memtable_used, uint64 *memtable_active_index,
|
||||
uint64 *memtable_start_index, uint64 *page_total)
|
||||
{
|
||||
LWLockAcquire(LogIndexMemListLock,LW_SHARED);
|
||||
*memtable_start_index = log_index_mem_list->table_start_index;
|
||||
*memtable_active_index = log_index_mem_list->active_table_index;
|
||||
*memtable_total = log_index_mem_list->table_cap;
|
||||
LWLockRelease(LogIndexMemListLock);
|
||||
*memtable_used = ((*memtable_active_index - *memtable_start_index) + *memtable_total)%*memtable_total + 1;
|
||||
uint64 tbl_index = *memtable_start_index;
|
||||
uint64 page_num = 0;
|
||||
while(tbl_index != *memtable_active_index)
|
||||
{
|
||||
LogIndexMemTBL *mem_tbl = &(log_index_mem_list->mem_table[tbl_index]);
|
||||
tbl_index = (tbl_index + 1)%(*memtable_total);
|
||||
page_num = page_num + mem_tbl->meta.page_free_head - 2;
|
||||
}
|
||||
if (tbl_index == *memtable_active_index)
|
||||
{
|
||||
LogIndexMemTBL *mem_tbl = &(log_index_mem_list->mem_table[tbl_index]);
|
||||
if (pg_atomic_read_u32(&mem_tbl->meta.state) != LOG_INDEX_MEM_TBL_STATE_FREE){
|
||||
page_num = page_num + mem_tbl->meta.page_free_head - 2;
|
||||
}
|
||||
}
|
||||
*page_total = page_num;
|
||||
}
|
||||
|
@ -769,7 +769,7 @@ smgrtruncatelsn(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber
|
||||
if (false == flag) {
|
||||
XLogRecPtr minApplyLsn;
|
||||
do {
|
||||
sleep(1);
|
||||
pg_usleep(200000);
|
||||
if (!EnableHotStandby || *isPromoteIsTriggered)
|
||||
minApplyLsn = QueryPushChkpointLsn();
|
||||
else
|
||||
|
@ -25,12 +25,15 @@
|
||||
#include "postmaster/bgworker_internals.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "replication/slot.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/procarray.h"
|
||||
#include "utils/acl.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/inet.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/timestamp.h"
|
||||
#include "storage/he3db_logindex.h"
|
||||
|
||||
#define UINT32_ACCESS_ONCE(var) ((uint32)(*((volatile uint32 *)&(var))))
|
||||
|
||||
@ -2381,3 +2384,98 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
|
||||
/* Returns the record as Datum */
|
||||
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns statistics of WAL activity
|
||||
*/
|
||||
Datum
|
||||
pg_stat_get_he3walwrite(PG_FUNCTION_ARGS)
|
||||
{
|
||||
#define PG_STAT_GET_HE3WALWRITE_COLS 4
|
||||
TupleDesc tupdesc;
|
||||
Datum values[PG_STAT_GET_HE3WALWRITE_COLS];
|
||||
bool nulls[PG_STAT_GET_HE3WALWRITE_COLS];
|
||||
XLogRecPtr writtenlsn, flushlsn;
|
||||
uint64 writtenTimes;
|
||||
int parallels;
|
||||
|
||||
/* Initialise values and NULL flags arrays */
|
||||
MemSet(values, 0, sizeof(values));
|
||||
MemSet(nulls, 0, sizeof(nulls));
|
||||
|
||||
/* Initialise attributes information in the tuple descriptor */
|
||||
tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_HE3WALWRITE_COLS);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "write_lsn",
|
||||
PG_LSNOID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "flush_lsn",
|
||||
PG_LSNOID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "writekv_totaltimes",
|
||||
INT8OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "writekv_parallels",
|
||||
INT4OID, -1, 0);
|
||||
|
||||
BlessTupleDesc(tupdesc);
|
||||
|
||||
/* Get statistics about WAL Write */
|
||||
if (EnableHotStandby && *isPromoteIsTriggered == false)
|
||||
PG_RETURN_NULL();
|
||||
|
||||
He3DBGetWalWriteStats(&writtenlsn, &flushlsn, &writtenTimes, ¶llels);
|
||||
|
||||
/* Fill values and NULLs */
|
||||
values[0] = LSNGetDatum(writtenlsn);
|
||||
values[1] = LSNGetDatum(flushlsn);
|
||||
values[2] = UInt64GetDatum(writtenTimes);
|
||||
values[3] = Int32GetDatum(parallels);
|
||||
|
||||
/* Returns the record as Datum */
|
||||
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns statistics of logindex
|
||||
*/
|
||||
Datum
|
||||
pg_stat_get_he3_logindex(PG_FUNCTION_ARGS)
|
||||
{
|
||||
#define PG_STAT_GET_HE3_LOGINDEX_COLS 5
|
||||
TupleDesc tupdesc;
|
||||
Datum values[PG_STAT_GET_HE3_LOGINDEX_COLS];
|
||||
bool nulls[PG_STAT_GET_HE3_LOGINDEX_COLS];
|
||||
uint64 memtable_total;
|
||||
uint64 memtable_used;
|
||||
uint64 memtable_active_index;
|
||||
uint64 memtable_start_index;
|
||||
uint64 page_total;
|
||||
|
||||
/* Initialise values and NULL flags arrays */
|
||||
MemSet(values, 0, sizeof(values));
|
||||
MemSet(nulls, 0, sizeof(nulls));
|
||||
|
||||
/* Initialise attributes information in the tuple descriptor */
|
||||
tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_HE3_LOGINDEX_COLS);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "memtable_total",
|
||||
INT8OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "memtable_used",
|
||||
INT8OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "memtable_start_index",
|
||||
INT8OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "memtable_active_index",
|
||||
INT8OID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "page_total",
|
||||
INT8OID, -1, 0);
|
||||
|
||||
BlessTupleDesc(tupdesc);
|
||||
|
||||
He3DBGetLogindexStats(&memtable_total, &memtable_used, &memtable_active_index, &memtable_start_index, &page_total);
|
||||
|
||||
/* Fill values and NULLs */
|
||||
values[0] = UInt64GetDatum(memtable_total);
|
||||
values[1] = UInt64GetDatum(memtable_used);
|
||||
values[2] = UInt64GetDatum(memtable_start_index);
|
||||
values[3] = UInt64GetDatum(memtable_active_index);
|
||||
values[4] = UInt64GetDatum(page_total);
|
||||
|
||||
/* Returns the record as Datum */
|
||||
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
|
||||
}
|
@ -2333,7 +2333,7 @@ static struct config_int ConfigureNamesInt[] =
|
||||
GUC_UNIT_S
|
||||
},
|
||||
&wal_receiver_status_interval,
|
||||
10, 0, INT_MAX / 1000,
|
||||
1, 0, INT_MAX / 1000,
|
||||
NULL, NULL, NULL
|
||||
},
|
||||
|
||||
|
@ -93,7 +93,7 @@
|
||||
# - Authentication -
|
||||
|
||||
#authentication_timeout = 1min # 1s-600s
|
||||
#password_encryption = scram-sha-256 # scram-sha-256 or md5
|
||||
|
||||
#db_user_namespace = off
|
||||
|
||||
# GSSAPI using Kerberos
|
||||
|
@ -354,6 +354,7 @@ extern void UpdateControlFile(void);
|
||||
extern void PushUpdateControlFile(void);
|
||||
extern void PushCheckPointGuts(XLogRecPtr checkPointRedo, int flags);
|
||||
extern XLogRecPtr GetXLogPushToDisk(void);
|
||||
extern void SetXLogPushToDisk(XLogRecPtr pushToDiskLsn);
|
||||
|
||||
extern uint64 GetSystemIdentifier(void);
|
||||
extern char *GetMockAuthenticationNonce(void);
|
||||
@ -429,6 +430,7 @@ extern void do_pg_abort_backup(int code, Datum arg);
|
||||
extern void register_persistent_abort_backup_handler(void);
|
||||
extern SessionBackupState get_backup_status(void);
|
||||
extern void pushXlogToTikv(char*data,int len);
|
||||
extern void He3DBGetWalWriteStats(XLogRecPtr *writtenlsn, XLogRecPtr *flushlsn, uint64 *totaltimes, int *parallels);
|
||||
|
||||
/* File path names (all relative to $PGDATA) */
|
||||
#define RECOVERY_SIGNAL_FILE "recovery.signal"
|
||||
|
@ -5582,6 +5582,22 @@
|
||||
proargnames => '{wal_records,wal_fpi,wal_bytes,wal_buffers_full,wal_write,wal_sync,wal_write_time,wal_sync_time,stats_reset}',
|
||||
prosrc => 'pg_stat_get_wal' },
|
||||
|
||||
{ oid => '6206', descr => 'statistics: information about He3DB WAL Write',
|
||||
proname => 'pg_stat_get_he3walwrite', proisstrict => 'f', provolatile => 's',
|
||||
proparallel => 'r', prorettype => 'record', proargtypes => '',
|
||||
proallargtypes => '{pg_lsn,pg_lsn,int8,int4}',
|
||||
proargmodes => '{o,o,o,o}',
|
||||
proargnames => '{write_lsn,flush_lsn,writekv_totaltimes,writekv_parallels}',
|
||||
prosrc => 'pg_stat_get_he3walwrite' },
|
||||
|
||||
{ oid => '6207', descr => 'statistics: information about He3DB LogIndex',
|
||||
proname => 'pg_stat_get_he3_logindex', proisstrict => 'f', provolatile => 's',
|
||||
proparallel => 'r', prorettype => 'record', proargtypes => '',
|
||||
proallargtypes => '{int8,int8,int8,int8,int8}',
|
||||
proargmodes => '{o,o,o,o,o}',
|
||||
proargnames => '{memtable_total,memtable_used,memtable_start_index,memtable_active_index,page_total}',
|
||||
prosrc => 'pg_stat_get_he3_logindex' },
|
||||
|
||||
{ oid => '2306', descr => 'statistics: information about SLRU caches',
|
||||
proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f',
|
||||
proretset => 't', provolatile => 's', proparallel => 'r',
|
||||
|
@ -70,7 +70,8 @@ typedef struct OriginDPageKey
|
||||
|
||||
typedef struct SdPageValue
|
||||
{
|
||||
SdPageKey pk;
|
||||
SdPageKey pk;
|
||||
bool canDelete;
|
||||
uint8 pagecontent[BLKSZ];
|
||||
} SdPageValue;
|
||||
|
||||
|
@ -117,4 +117,6 @@ extern void FreeLsnNode(LsnNode *head);
|
||||
extern TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn);
|
||||
extern void FreeTagNode(TagNode *head);
|
||||
extern bool CheckBufTagExistByLsnRange(const BufferTag *page, XLogRecPtr start_lsn, XLogRecPtr end_lsn);
|
||||
extern void He3DBGetLogindexStats(uint64 *memtable_total, uint64 *memtable_used, uint64 *memtable_active_index,
|
||||
uint64 *memtable_start_index, uint64 *page_total);
|
||||
#endif /* HE3DB_LOGINDEX_H */
|
Loading…
Reference in New Issue
Block a user