feat: logindex support CheckBufTagExistByLsnRange

This commit is contained in:
wangyao 2023-05-30 17:54:23 +08:00
parent ebdbde1b86
commit 2d2441e18e
2 changed files with 140 additions and 3 deletions

View File

@ -606,7 +606,8 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn)
}else if(mem_tbl->meta.min_lsn >= end_lsn)
{
// there is no suitability lsn_list after this mem table
break;
LWLockRelease(LogIndexMemListLock);
return head_node;
}
else
{
@ -627,7 +628,7 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn)
page_min_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, first_seg->suffix_lsn[0]);
uint8 id = Min(LOG_INDEX_MEM_ITEM_SEG_LSN_NUM - 1, last_seg->number - 1);
page_max_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, last_seg->suffix_lsn[id]);
if(page_min_lsn > end_lsn || page_max_lsn < start_lsn)
if(page_min_lsn >= end_lsn || page_max_lsn < start_lsn)
{
continue;
}
@ -637,6 +638,7 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn)
}
}
}
LWLockRelease(LogIndexMemListLock);
return head_node;
}
}
@ -663,7 +665,7 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn)
page_min_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, first_seg->suffix_lsn[0]);
uint8 id = Min(LOG_INDEX_MEM_ITEM_SEG_LSN_NUM - 1, last_seg->number - 1);
page_max_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, last_seg->suffix_lsn[id]);
if(page_min_lsn > end_lsn || page_max_lsn < start_lsn)
if(page_min_lsn >= end_lsn || page_max_lsn < start_lsn)
{
continue;
}
@ -680,6 +682,140 @@ TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn)
return head_node;
}
bool CheckBufTagExistByLsnRange(const BufferTag *page, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
{
uint64 tbl_index;
LogIndexMemItemSeg *first_seg;
LogIndexMemItemSeg *last_seg;
XLogRecPtr page_min_lsn;
XLogRecPtr page_max_lsn;
uint32 hash_key;
LogIndexMemItemHead *page_head;
// Prevent metadata changes during discovery.
LWLockAcquire(LogIndexMemListLock,LW_SHARED);
tbl_index = log_index_mem_list->table_start_index;
loop:
while(tbl_index != log_index_mem_list->active_table_index)
{
LogIndexMemTBL *mem_tbl = &(log_index_mem_list->mem_table[tbl_index]);
tbl_index = (tbl_index + 1)%(log_index_mem_list->table_cap);
// current mem table no suitability lsn_list
if(mem_tbl->meta.max_lsn < start_lsn)
{
continue;
}else if(mem_tbl->meta.min_lsn >= end_lsn)
{
// there is no suitability lsn_list after this mem table
goto outerloop;
}
else
{
// find page from current mem table
hash_key = LOG_INDEX_MEM_TBL_HASH_PAGE(page);
if(mem_tbl->hash[hash_key] != LOG_INDEX_TBL_INVALID_SEG)
{
page_head = &(mem_tbl->page_head[mem_tbl->hash[hash_key]-1]);
while(!BUFFERTAGS_EQUAL(page_head->tag, *page)){
if(page_head->next_item == LOG_INDEX_TBL_INVALID_SEG)
{
// cannot find page from current mem table
goto loop;
}
page_head = &(mem_tbl->page_head[page_head->next_item-1]);
}
// find request page, but not lsn
if(page_head->next_seg == LOG_INDEX_TBL_INVALID_SEG || page_head->tail_seg == LOG_INDEX_TBL_INVALID_SEG)
{
continue;
}
else
{
first_seg = &(mem_tbl->seg_item[page_head->next_seg - 1]);
last_seg = &(mem_tbl->seg_item[page_head->tail_seg - 1]);
page_min_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, first_seg->suffix_lsn[0]);
uint8 id = Min(LOG_INDEX_MEM_ITEM_SEG_LSN_NUM - 1, last_seg->number - 1);
page_max_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, last_seg->suffix_lsn[id]);
// lsn not correspond with request
if(page_min_lsn >= end_lsn || page_max_lsn < start_lsn)
{
continue;
}
else
{
// find one
LWLockRelease(LogIndexMemListLock);
return true;
}
}
}else
{
continue;
}
}
}
if (tbl_index == log_index_mem_list->active_table_index){
LogIndexMemTBL *mem_tbl = &(log_index_mem_list->mem_table[tbl_index]);
tbl_index = (tbl_index + 1)%(log_index_mem_list->table_cap);
// current mem table no suitability lsn_list
if(mem_tbl->meta.max_lsn < start_lsn)
{
goto outerloop;
}else if(mem_tbl->meta.min_lsn >= end_lsn)
{
// there is no suitability lsn_list after this mem table
goto outerloop;
}
else
{
// find page from current mem table
hash_key = LOG_INDEX_MEM_TBL_HASH_PAGE(page);
if(mem_tbl->hash[hash_key] != LOG_INDEX_TBL_INVALID_SEG)
{
page_head = &(mem_tbl->page_head[mem_tbl->hash[hash_key]-1]);
while(!BUFFERTAGS_EQUAL(page_head->tag, *page)){
if(page_head->next_item == LOG_INDEX_TBL_INVALID_SEG)
{
// cannot find page from current mem table
goto outerloop;
}
page_head = &(mem_tbl->page_head[page_head->next_item-1]);
}
// find request page
if(page_head->next_seg == LOG_INDEX_TBL_INVALID_SEG || page_head->tail_seg == LOG_INDEX_TBL_INVALID_SEG)
{
goto outerloop;
}
else
{
first_seg = &(mem_tbl->seg_item[page_head->next_seg - 1]);
last_seg = &(mem_tbl->seg_item[page_head->tail_seg - 1]);
page_min_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, first_seg->suffix_lsn[0]);
uint8 id = Min(LOG_INDEX_MEM_ITEM_SEG_LSN_NUM - 1, last_seg->number - 1);
page_max_lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, last_seg->suffix_lsn[id]);
if(page_min_lsn >= end_lsn || page_max_lsn < start_lsn)
{
goto outerloop;
}
else
{
// find one
LWLockRelease(LogIndexMemListLock);
return true;
}
}
}else
{
goto outerloop;
}
}
}
outerloop:
LWLockRelease(LogIndexMemListLock);
return false;
}
void FreeTagNode(TagNode *head)
{
TagNode* tn;

View File

@ -116,4 +116,5 @@ extern LsnNode *GetLogIndexByPage(const BufferTag *page, XLogRecPtr start_lsn, X
extern void FreeLsnNode(LsnNode *head);
extern TagNode *GetBufTagByLsnRange(XLogRecPtr start_lsn, XLogRecPtr end_lsn);
extern void FreeTagNode(TagNode *head);
extern bool CheckBufTagExistByLsnRange(const BufferTag *page, XLogRecPtr start_lsn, XLogRecPtr end_lsn);
#endif /* HE3DB_LOGINDEX_H */