!183 parallel replay panic

Merge pull request !183 from shenzhengntu/dev_performance
This commit is contained in:
shipixian 2023-06-28 02:33:47 +00:00 committed by Gitee
commit f86c2a823f
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
4 changed files with 50 additions and 44 deletions

View File

@ -93,7 +93,7 @@ max_connections = 100 # (change requires restart)
# - Authentication -
#authentication_timeout = 1min # 1s-600s
#password_encryption = scram-sha-256 # scram-sha-256 or md5
#db_user_namespace = off
# GSSAPI using Kerberos

View File

@ -93,7 +93,7 @@ max_connections = 100 # (change requires restart)
# - Authentication -
#authentication_timeout = 1min # 1s-600s
password_encryption = md5 # scram-sha-256 or md5
#db_user_namespace = off
# GSSAPI using Kerberos

View File

@ -470,24 +470,29 @@ static HTAB *PageCountHash = NULL;
static uint32_t curLatchPos = 0;
typedef struct {
pg_atomic_uint32 gpushpos;
pg_atomic_uint32 ready;
pg_atomic_uint32 gpos;
slock_t mutex;
volatile uint32 gpushpos;
volatile bool ready;
volatile uint32 gpos;
pg_atomic_uint32 latchPos;
pg_atomic_uint32 taskNum;
pg_atomic_uint32 modifyNum;
uint32 modifyNum;
Latch pageFlushWakeupLatch[PARALLEL_NUM];
PageValue*gtag[G_QUEUE_LEN];
}PageHashQueueShmemStruct;
static PageHashQueueShmemStruct *PageHashQueueShmem;
void pushSlaveReplayQueue(int pageNum) {
if (pg_atomic_read_u32(&PageHashQueueShmem->gpos) != 0 && pg_atomic_read_u32(&PageHashQueueShmem->ready) == 0) {
pg_atomic_exchange_u32(&PageHashQueueShmem->ready,1);
if (PageHashQueueShmem->gpos != 0 && PageHashQueueShmem->ready == false) {
SpinLockAcquire(&PageHashQueueShmem->mutex);
PageHashQueueShmem->ready = true;
SpinLockRelease(&PageHashQueueShmem->mutex);
WakeupFlushWork();
}
}
while(pageNum > CompletedTaskNum()) {
usleep(10);
pg_usleep(1000L);
}
cleanMap();
}
@ -547,12 +552,15 @@ PageHashQueueShmemInit(void)
if (!found)
{
pg_atomic_init_u32(&PageHashQueueShmem->ready,0);
pg_atomic_init_u32(&PageHashQueueShmem->gpushpos,0);
pg_atomic_init_u32(&PageHashQueueShmem->latchPos,0);
SpinLockInit(&PageHashQueueShmem->mutex);
SpinLockAcquire(&PageHashQueueShmem->mutex);
PageHashQueueShmem->ready = false;
PageHashQueueShmem->gpushpos = 0;
SpinLockRelease(&PageHashQueueShmem->mutex);
PageHashQueueShmem->gpos = 0;
pg_atomic_init_u32(&PageHashQueueShmem->taskNum,0);
pg_atomic_init_u32(&PageHashQueueShmem->gpos, 0);
pg_atomic_init_u32(&PageHashQueueShmem->modifyNum,0);
pg_atomic_init_u32(&PageHashQueueShmem->latchPos, 0);
PageHashQueueShmem->modifyNum = 0;
for (int i = 0;i<PARALLEL_NUM;i++) {
InitSharedLatch(&PageHashQueueShmem->pageFlushWakeupLatch[i]);
}
@ -567,7 +575,6 @@ Size PageHashMapSize(void) {
return RedoStartPointSize() + hash_estimate_size(G_QUEUE_LEN,sizeof(PageValue));
}
void
InitBufferPoolHashMap(void)
{
@ -605,11 +612,12 @@ uint32_t addFileKey(BufferTag*onePage) {
&found);
if (found == false) {
result->num = 0;
uint32_t gpos = pg_atomic_fetch_add_u32(&PageHashQueueShmem->gpos,1);
uint32_t gpos = PageHashQueueShmem->gpos++;
PageHashQueueShmem->gtag[gpos] = result;
}
result->num++;
return pg_atomic_fetch_add_u32(&PageHashQueueShmem->modifyNum,1);
PageHashQueueShmem->modifyNum++;
return PageHashQueueShmem->modifyNum;
}
void cleanMap(void) {
@ -624,11 +632,13 @@ void cleanMap(void) {
HASH_REMOVE, NULL) == NULL)
elog(ERROR, "hash table corrupted");
}
pg_atomic_init_u32(&PageHashQueueShmem->ready,0);
pg_atomic_init_u32(&PageHashQueueShmem->gpos, 0);
pg_atomic_init_u32(&PageHashQueueShmem->gpushpos,0);
SpinLockAcquire(&PageHashQueueShmem->mutex);
PageHashQueueShmem->ready = false;
PageHashQueueShmem->gpushpos = 0;
SpinLockRelease(&PageHashQueueShmem->mutex);
PageHashQueueShmem->gpos = 0;
pg_atomic_init_u32(&PageHashQueueShmem->taskNum,0);
pg_atomic_init_u32(&PageHashQueueShmem->modifyNum,0);
PageHashQueueShmem->modifyNum = 0;
}
uint32_t hashMapSize(void) {
@ -640,35 +650,31 @@ static int cmp(const void* a,const void* b) {
}
void SortPageQueue(void) {
if (pg_atomic_read_u32(&PageHashQueueShmem->gpos) != 0 && pg_atomic_read_u32(&PageHashQueueShmem->ready) == 0) {
qsort(PageHashQueueShmem->gtag,pg_atomic_read_u32(&PageHashQueueShmem->gpos),sizeof(PageValue*),cmp);
pg_atomic_exchange_u32(&PageHashQueueShmem->ready,1);
if (PageHashQueueShmem->gpos != 0 && PageHashQueueShmem->ready == false) {
qsort(PageHashQueueShmem->gtag,PageHashQueueShmem->gpos,sizeof(PageValue*),cmp);
SpinLockAcquire(&PageHashQueueShmem->mutex);
PageHashQueueShmem->ready = true;
SpinLockRelease(&PageHashQueueShmem->mutex);
WakeupFlushWork();
return;
}
}
BufferTag* QueuePushPage(void) {
uint32_t gpushpos = pg_atomic_read_u32(&PageHashQueueShmem->gpushpos);
uint32_t gpos = pg_atomic_read_u32(&PageHashQueueShmem->gpos);
uint32_t ready = pg_atomic_read_u32(&PageHashQueueShmem->ready);
if (ready == 0 ||
gpushpos >= gpos){
return NULL;
}
gpushpos = pg_atomic_fetch_add_u32(&PageHashQueueShmem->gpushpos,1);
uint32_t ReGpos = pg_atomic_read_u32(&PageHashQueueShmem->gpos);
ready = pg_atomic_read_u32(&PageHashQueueShmem->ready);
/* only ready 1 to 0 then 0 to 1 will produce a serious problem,
reason functions of pushTikv trigger the next batch of data processing
cost time less than (pg_atomic_read_u32 and pg_atomic_fetch_add_u32) */
if (ready!= 0 && gpushpos < gpos) {
return &(PageHashQueueShmem->gtag[gpushpos]->tag);
} else {
if (ReGpos!=0 && gpushpos < gpos) {
elog(PANIC,"QueuePushPage gpushpos %d < gpos %d",gpushpos,gpos);
uint32_t gpushpos;
bool hasData = false;
if (PageHashQueueShmem->ready == true) {
SpinLockAcquire(&PageHashQueueShmem->mutex);
if (PageHashQueueShmem->ready == true && PageHashQueueShmem->gpushpos < PageHashQueueShmem->gpos) {
hasData = true;
gpushpos = PageHashQueueShmem->gpushpos++;
}
SpinLockRelease(&PageHashQueueShmem->mutex);
}
if (hasData == false) {
return NULL;
} else {
return &(PageHashQueueShmem->gtag[gpushpos]->tag);
}
}

View File

@ -93,7 +93,7 @@
# - Authentication -
#authentication_timeout = 1min # 1s-600s
#password_encryption = scram-sha-256 # scram-sha-256 or md5
#db_user_namespace = off
# GSSAPI using Kerberos