Merge remote-tracking branch 'upstream/dev_performance' into dev_performance

This commit is contained in:
zoujia 2023-03-23 09:44:26 +08:00
commit 5334789ae1
15 changed files with 797 additions and 262 deletions

View File

@ -178,7 +178,7 @@ pg_prewarm(PG_FUNCTION_ARGS)
for (block = first_block; block <= last_block; ++block)
{
CHECK_FOR_INTERRUPTS();
smgrread(rel->rd_smgr, forkNumber, block, blockbuffer.data, GetXLogWriteRecPtr());
smgrread(rel->rd_smgr, forkNumber, block, blockbuffer.data);
++blocks_done;
}
}

View File

@ -807,9 +807,9 @@ typedef struct XLogCtlData
XLogRecPtr lastFpwDisableRecPtr;
long timestamp;
long timestp;
// long timestp;
long oldflush;
long ol;
// long ol;
XLogRecPtr globalUpto;
@ -1055,9 +1055,7 @@ static int get_sync_bit(int method);
static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
XLogRecData *rdata,
XLogRecPtr StartPos, XLogRecPtr EndPos);
static void He3DBCopyXLogRecordToWAL(int write_len, bool isLogSwitch,
XLogRecData *rdata,
XLogRecPtr StartPos, XLogRecPtr EndPos);
static void He3DBCopyXLogRecordToWAL(int write_len, XLogRecPtr StartPos, XLogRecPtr EndPos);
static void ReserveXLogInsertLocation(int size, int firstsize, XLogRecPtr *StartPos,
XLogRecPtr *EndPos, XLogRecPtr *PrevPtr, XLogRecPtr *startbytepos);
static void He3DBReserveXLogInsertLocation(int size, int firstsize, XLogRecPtr *StartPos,
@ -1472,11 +1470,8 @@ He3DBXLogInsertRecord(XLogRecData *rdata,
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
pg_crc32c rdata_crc;
bool inserted;
XLogRecord *rechdr = (XLogRecord *) rdata->data;
uint8 info = rechdr->xl_info & ~XLR_INFO_MASK;
bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID &&
info == XLOG_SWITCH);
XLogRecPtr StartPos; //本次预留空间的起始位置
XLogRecPtr EndPos; //本次预留空间的结束位置
bool prevDoPageWrites = doPageWrites;
@ -1567,74 +1562,31 @@ He3DBXLogInsertRecord(XLogRecData *rdata,
* pointer.
*/
XLogRecPtr startbytepos;
if (isLogSwitch) {
inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
rechdr->xl_end = EndPos;
}
else
{
He3DBReserveXLogInsertLocation(group_total_len, rechdr->xl_tot_len, &StartPos, &EndPos,
He3DBReserveXLogInsertLocation(group_total_len, rechdr->xl_tot_len, &StartPos, &EndPos,
&rechdr->xl_prev,&startbytepos);
inserted = true;
}
uint32 xlog_write_bytes = 0;
if (inserted)
XLogRecPtr tmpStartPos;
XLogRecPtr tmpEndPos;
for (int i = 0; i < grouo_rec_count; i++)
{
XLogRecPtr tmpStartPos;
XLogRecPtr tmpEndPos;
for (int i = 0; i < grouo_rec_count; i++)
{
rechdr = (XLogRecord *)grouphead[i];
/*
* Now that xl_prev has been filled in, calculate CRC of the record
* header.
*/
rdata_crc = rechdr->xl_crc;
COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(rdata_crc);
rechdr->xl_crc = rdata_crc;
/*
* All the record data, including the header, is now ready to be
* inserted. Copy the record in the space reserved.
*/
rdata = (XLogRecData *)&groupRecData[i];
if (isLogSwitch != true) {
tmpStartPos = XLogBytePosToRecPtr(startbytepos);
startbytepos += grouplens[i];
tmpEndPos = XLogBytePosToEndRecPtr(startbytepos);
} else {
tmpStartPos = StartPos;
tmpEndPos = EndPos;
}
He3DBCopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata,
tmpStartPos, tmpEndPos);
xlog_write_bytes += rechdr->xl_tot_len;
}
rechdr = (XLogRecord *)grouphead[i];
/*
* Unless record is flagged as not important, update LSN of last
* important record in the current slot. When holding all locks, just
* update the first one.
* Now that xl_prev has been filled in, calculate CRC of the record
* header.
*/
// if ((flags & XLOG_MARK_UNIMPORTANT) == 0)
// {
// int lockno = holdingAllLocks ? 0 : MyLockNo;
rdata_crc = rechdr->xl_crc;
COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(rdata_crc);
rechdr->xl_crc = rdata_crc;
}
// WALInsertLocks[lockno].l.lastImportantAt = StartPos;
// }
}
else
{
/*
* This was an xlog-switch record, but the current insert location was
* already exactly at the beginning of a segment, so there was no need
* to do anything.
*/
}
/*
* All the record data, including the header, is now ready to be
* inserted. Copy the record in the space reserved.
*/
He3DBCopyXLogRecordToWAL(group_total_len, StartPos, EndPos);
/*
* Done! Let others know that we're finished.
@ -1645,20 +1597,6 @@ He3DBXLogInsertRecord(XLogRecData *rdata,
END_CRIT_SECTION();
/*
* Update shared LogwrtRqst.Write, if we crossed page boundary.
*/
// if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
// {
// SpinLockAcquire(&XLogCtl->info_lck);
// /* advance global request to include new block(s) */
// if (XLogCtl->LogwrtRqst.Write < EndPos)
// XLogCtl->LogwrtRqst.Write = EndPos;
// /* update local result copy while I have the chance */
// LogwrtResult = XLogCtl->LogwrtResult;
// SpinLockRelease(&XLogCtl->info_lck);
// }
SpinLockAcquire(&XLogCtl->info_lck);
/* advance global request to include new block(s) */
if (XLogCtl->LogwrtRqst.Write < EndPos)
@ -1668,36 +1606,6 @@ He3DBXLogInsertRecord(XLogRecData *rdata,
SpinLockRelease(&XLogCtl->info_lck);
/*
* If this was an XLOG_SWITCH record, flush the record and the empty
* padding space that fills the rest of the segment, and perform
* end-of-segment actions (eg, notifying archiver).
*/
if (isLogSwitch)
{
TRACE_POSTGRESQL_WAL_SWITCH();
XLogFlush(EndPos);
/*
* Even though we reserved the rest of the segment for us, which is
* reflected in EndPos, we return a pointer to just the end of the
* xlog-switch record.
*/
if (inserted)
{
EndPos = StartPos + SizeOfXLogRecord;
if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
{
uint64 offset = XLogSegmentOffset(EndPos, wal_segment_size);
if (offset == EndPos % XLOG_BLCKSZ)
EndPos += SizeOfXLogLongPHD;
else
EndPos += SizeOfXLogShortPHD;
}
}
}
#ifdef WAL_DEBUG1
if (XLOG_DEBUG)
{
@ -1755,12 +1663,10 @@ He3DBXLogInsertRecord(XLogRecData *rdata,
XactLastRecEnd = EndPos;
/* Report WAL traffic to the instrumentation. */
if (inserted)
{
pgWalUsage.wal_bytes += xlog_write_bytes;
pgWalUsage.wal_records+=grouo_rec_count;
pgWalUsage.wal_fpi += num_fpi;
}
pgWalUsage.wal_bytes += group_total_len;
pgWalUsage.wal_records+=grouo_rec_count;
pgWalUsage.wal_fpi += num_fpi;
return EndPos;
}
@ -2208,14 +2114,13 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
}
static void
He3DBCopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
XLogRecPtr StartPos, XLogRecPtr EndPos)
He3DBCopyXLogRecordToWAL(int write_len, XLogRecPtr StartPos, XLogRecPtr EndPos)
{
char *currpos;
// int freespace;
int written;
XLogRecPtr CurrPos;
// XLogPageHeader pagehdr;
XLogRecData *rdata;
/*
* 1xlog buffer的大小
@ -2240,10 +2145,13 @@ He3DBCopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
written = 0;
// if (rdata != NULL)
// {
while (rdata != NULL)
for (int i = 0; i < grouo_rec_count; i++)
{
char *rdata_data = rdata->data;
int rdata_len = rdata->len;
rdata = (XLogRecData *)&groupRecData[i];
while (rdata != NULL)
{
char *rdata_data = rdata->data;
int rdata_len = rdata->len;
// if (rdata_len > remaindXlogBufferLength)
// {
@ -2259,9 +2167,10 @@ He3DBCopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
// }
CurrPos += rdata_len;
written += rdata_len;
rdata = rdata->next;
CurrPos += rdata_len;
written += rdata_len;
rdata = rdata->next;
}
}
// memcpy(currpos,1,1);
// CurrPos ++;
@ -3158,7 +3067,6 @@ He3DBAdvanceXLInsertBuffer(int xlogLength, XLogRecPtr upto, bool opportunistic)
// else
if (upto + xlogLength - LogwrtResult.Flush >= ((XLOGbuffers-2) * XLOG_BLCKSZ))
{
printf(".......I am in.........\n");
/* Have to write it ourselves */
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
WriteRqst.Write = upto;
@ -3753,9 +3661,9 @@ FlushWal(XLogwrtRqst WriteRqst)
{
char *from = NULL;
uint8 part[4];
// uint8 part[4];
uint64 count;
int stp;
// int stp;
int xlogLength;
uint32 curLoc = 0;
bool mustDo = false;
@ -3768,7 +3676,7 @@ mustflush:
SpinLockAcquire(&XLogCtl->info_lck);
if(WriteRqst.Write - XLogCtl->LogwrtResult.Write < 8192 && (!mustDo))
{
elog(LOG,"=+= first time,goto while{},WriteRqst.Write is %llu,XLogCtl->LogwrtResult.Write is %llu",WriteRqst.Write,XLogCtl->LogwrtResult.Write);
// elog(LOG,"=+= first time,goto while{},WriteRqst.Write is %llu,XLogCtl->LogwrtResult.Write is %llu",WriteRqst.Write,XLogCtl->LogwrtResult.Write);
SpinLockRelease(&XLogCtl->info_lck);
}
else
@ -3779,17 +3687,17 @@ mustflush:
XLogCtl->LogwrtResult.Write = WriteRqst.Write;
curLoc = XLogCtl->LogFlush.last;
XLogCtl->LogFlush.last += 1;
printf("curLoc %d, WriteRqst.Write %ld, LogwrtResult.Write %ld\n", curLoc, WriteRqst.Write, LogwrtResult.Write);
// printf("curLoc %d, WriteRqst.Write %ld, LogwrtResult.Write %ld\n", curLoc, WriteRqst.Write, LogwrtResult.Write);
}
elog(LOG,"=+= second time,need to push,WriteRqst.Write is %llu,XLogCtl->LogwrtResult.Write is %llu",WriteRqst.Write,XLogCtl->LogwrtResult.Write);
// elog(LOG,"=+= second time,need to push,WriteRqst.Write is %llu,XLogCtl->LogwrtResult.Write is %llu",WriteRqst.Write,XLogCtl->LogwrtResult.Write);
SpinLockRelease(&XLogCtl->info_lck);
from = XLogCtl->pages + LogwrtResult.Write % ((XLOGbuffers-1) * XLOG_BLCKSZ);
count = LogwrtResult.Write;
printf("write request %ld, result %ld; flush request %ld, result %ld\n", WriteRqst.Write, count,
WriteRqst.Flush, LogwrtResult.Flush);
// printf("write request %ld, result %ld; flush request %ld, result %ld\n", WriteRqst.Write, count,
// WriteRqst.Flush, LogwrtResult.Flush);
XLogRecord *record;
while (count < WriteRqst.Write)
@ -3864,17 +3772,17 @@ mustflush:
if (xlogItemList != NULL)
{
printf("xlogItemList not null, WriteRqst.Write %ld, curLoc %d\n", WriteRqst.Write, curLoc);
// printf("xlogItemList not null, WriteRqst.Write %ld, curLoc %d\n", WriteRqst.Write, curLoc);
if (xlogItemList->head != NULL)
{
struct timeval tv;
long timestp,timenow;
gettimeofday(&tv,NULL);
timestp =tv.tv_sec*1000 + tv.tv_usec/1000;
// struct timeval tv;
// long timestp,timenow;
// gettimeofday(&tv,NULL);
// timestp =tv.tv_sec*1000 + tv.tv_usec/1000;
flushwals(xlogItemList->head, XLogCtl->ThisTimeLineID);
gettimeofday(&tv,NULL);
timenow = tv.tv_sec*1000 + tv.tv_usec/1000;
printf("flushwals time is %ld\n",timenow - timestp);
// gettimeofday(&tv,NULL);
// timenow = tv.tv_sec*1000 + tv.tv_usec/1000;
// printf("flushwals time is %ld\n",timenow - timestp);
freeItemList(xlogItemList);
WalStats.m_wal_write++;
LogwrtResult.Write = WriteRqst.Write;
@ -3886,18 +3794,18 @@ mustflush:
* Update shared-memory status
*/
{
struct timeval tv;
long timestp;
uint64 oldflush;
long unitflush;
int bRelativeOffset = 0;
// struct timeval tv;
// long timestp;
// uint64 oldflush;
// long unitflush;
int bRelativeOffset = 0;
int eRelativeOffset = 0;
SpinLockAcquire(&XLogCtl->info_lck);
XLogParralFlush flushInfo = XLogCtl->LogFlush;
SpinLockRelease(&XLogCtl->info_lck);
printf("end flush wals, begin %d, curLoc %d, WriteRqst.Write %ld\n", flushInfo.begin, curLoc, WriteRqst.Write);
// printf("end flush wals, begin %d, curLoc %d, WriteRqst.Write %ld\n", flushInfo.begin, curLoc, WriteRqst.Write);
while (flushInfo.begin < curLoc)
{
pg_usleep(20L);
@ -3948,19 +3856,19 @@ mustflush:
MemSet((char *)XLogCtl->pages, 0, eRelativeOffset);
}
gettimeofday(&tv,NULL);
timestp =tv.tv_sec*1000 + tv.tv_usec/1000;
if(XLogCtl->timestp == 0)
{
XLogCtl->timestp=timestp;
XLogCtl->ol = 0;
}else if(timestp - XLogCtl->timestp>=1000){
unitflush=(XLogCtl->LogwrtResult.Flush - XLogCtl->ol)*1000/(timestp-XLogCtl->timestp);
printf("---unitflush:%llu---\n---now:%lld---\n---last time:%lld---\n---flush:%llu---\n---last flush:%llu---\n",unitflush,timestp/1000,XLogCtl->timestp/1000,XLogCtl->LogwrtResult.Flush,XLogCtl->ol);
XLogCtl->timestp = timestp;
XLogCtl->ol = XLogCtl->LogwrtResult.Flush;
// gettimeofday(&tv,NULL);
// timestp =tv.tv_sec*1000 + tv.tv_usec/1000;
// if(XLogCtl->timestp == 0)
// {
// XLogCtl->timestp=timestp;
// XLogCtl->ol = 0;
// }else if(timestp - XLogCtl->timestp>=1000){
// unitflush=(XLogCtl->LogwrtResult.Flush - XLogCtl->ol)*1000/(timestp-XLogCtl->timestp);
// printf("---unitflush:%llu---\n---now:%lld---\n---last time:%lld---\n---flush:%llu---\n---last flush:%llu---\n",unitflush,timestp/1000,XLogCtl->timestp/1000,XLogCtl->LogwrtResult.Flush,XLogCtl->ol);
// XLogCtl->timestp = timestp;
// XLogCtl->ol = XLogCtl->LogwrtResult.Flush;
}
// }
SpinLockRelease(&XLogCtl->info_lck);

View File

@ -448,12 +448,7 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
CHECK_FOR_INTERRUPTS();
// smgrread(src, forkNum, blkno, buf.data, GetXLogWriteRecPtr());
smgrread(src, forkNum, blkno, &dataPage, InvalidXLogRecPtr);
for(int i = 0; i < BLCKSZ; i ++) {
buf.data[i] = dataPage[i];
}
free(dataPage);
smgrread(src, forkNum, blkno, buf.data);
if (!PageIsVerifiedExtended(page, blkno,
PIV_LOG_WARNING | PIV_REPORT_STAT))

View File

@ -856,12 +856,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
bool isExtend;
bool isLocalBuf = SmgrIsTemp(smgr);
/* he3db: local tem buffer for pageXlog */
char *pageXlogBuf;
/* he3db: Bytes he3dbsmgrread actually read */
int nbytes;
// char *pageXlogBuf;
*hit = false;
pageXlogBuf = NULL;
// pageXlogBuf = NULL;
/* Make sure we will have room to remember the buffer pin */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
@ -1061,13 +1060,9 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
if (track_io_timing)
INSTR_TIME_SET_CURRENT(io_start);
/* he3db: read page and xlog Associated with it */
XLogRecPtr replayLsn = GetXLogWriteRecPtr();
nbytes = he3dbsmgrread(smgr, forkNum, blockNum, &pageXlogBuf,replayLsn);
memcpy((char *) bufBlock, pageXlogBuf, BLCKSZ);
/* propeller instance no page xlog replay */
free_dataRead(pageXlogBuf, 1, 1);
pageXlogBuf = NULL;
// XLogRecPtr replayLsn = GetXLogWriteRecPtr();
smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
if (track_io_timing)
{
@ -1089,12 +1084,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
blockNum,
relpath(smgr->smgr_rnode, forkNum))));
MemSet((char *) bufBlock, 0, BLCKSZ);
/* He3DB: He3FS */
if(pageXlogBuf != NULL)
{
free_dataRead(pageXlogBuf, 1, 1);
pageXlogBuf = NULL;
}
}
else
ereport(ERROR,
@ -1132,7 +1121,9 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
}
else
{
//todo: read related wals in standby instance.
/*
* He3DB: page-replay.
*

View File

@ -197,7 +197,7 @@ extern bool PageIsHot();
typedef struct vfd
{
int64_t fd; /* current FD, or VFD_CLOSED if none */
int fd; /* current FD, or VFD_CLOSED if none */
unsigned short fdstate; /* bitflags for VFD's state */
ResourceOwner resowner; /* owner, for automatic cleanup */
File nextFree; /* link to next free VFD, if in freelist */
@ -982,12 +982,7 @@ count_usable_fds(int max_to_probe, int *usable_fds, int *already_open)
/* release the files we opened */
for (j = 0; j < used; j++)
{
//close(fd[j]);
if(close(fd[j]) != 0)
{
/* He3DB: Add He3FS Compatibility*/
closefs(fd[j]);
}
close(fd[j]);
}
pfree(fd);
@ -1293,9 +1288,7 @@ LruDelete(File file)
* Close the file. We aren't expecting this to fail; if it does, better
* to leak the FD than to mess up our internal state.
*/
//if (close(vfdP->fd) != 0)
/* He3DB: Add He3FS Compatibility*/
if (closefs(vfdP->fd) != 0 && close(vfdP->fd) != 0)
if (close(vfdP->fd) != 0)
elog(vfdP->fdstate & FD_TEMP_FILE_LIMIT ? LOG : data_sync_elevel(LOG),
"could not close file \"%s\": %m", vfdP->fileName);
vfdP->fd = VFD_CLOSED;
@ -1349,7 +1342,7 @@ LruInsert(File file)
* overall system file table being full. So, be prepared to release
* another FD if necessary...
*/
vfdP->fd = He3DBBasicOpenFilePerm(vfdP->fileName, vfdP->fileFlags,
vfdP->fd = BasicOpenFilePerm(vfdP->fileName, vfdP->fileFlags,
vfdP->fileMode);
if (vfdP->fd < 0)
{
@ -3144,16 +3137,7 @@ FreeDesc(AllocateDesc *desc)
result = closedir(desc->desc.dir);
break;
case AllocateDescRawFD:
//result = close(desc->desc.fd);
/* He3DB: Add He3FS Compatibility*/
if(close(desc->desc.fd) == 0 || closefs(desc->desc.fd) == 0)
{
result = 0;
}
else
{
result = 1;
}
result = close(desc->desc.fd);
break;
default:
elog(ERROR, "AllocateDesc kind not recognized");
@ -3221,16 +3205,7 @@ CloseTransientFile(int fd)
/* Only get here if someone passes us a file not in allocatedDescs */
elog(WARNING, "fd passed to CloseTransientFile was not obtained from OpenTransientFile");
//return close(fd);
/* He3DB: Add He3FS Compatibility*/
if(close(fd) == 0 || closefs(fd) == 0)
{
return 0;
}
else
{
return 1;
}
return close(fd);
}
/*

View File

@ -36,6 +36,7 @@
#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/dsm.h"
#include "storage/he3db_logindex.h"
#include "storage/ipc.h"
#include "storage/pg_shmem.h"
#include "storage/pmsignal.h"
@ -161,7 +162,8 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, total_addin_request);
/* walloghash code */
size = add_size(size, 1<<30);
// size = add_size(size, 1<<30);
size = add_size(size, He3dbLogIndexShmemSize());
/* cache file size */
size = add_size(size, FileCacheSize());
@ -239,7 +241,8 @@ CreateSharedMemoryAndSemaphores(void)
/*
* set up wal log hash
*/
InitWalLogHash();
// InitWalLogHash();
He3dbLogIndexTblListInit();
/*
* set up fs meta

View File

@ -22,7 +22,8 @@ OBJS = \
predicate.o \
proc.o \
s_lock.o \
spin.o
spin.o \
he3db_logindex.o
include $(top_srcdir)/src/backend/common.mk

View File

@ -0,0 +1,545 @@
#include "postgres.h"
#include "storage/he3db_logindex.h"
#include "storage/shmem.h"
#include "storage/spin.h"
static LogIndexMemList *log_index_mem_list;
static uint64 logindex_mem_tbl_size;
static Size
LogIndexMemListSize(uint64 he3db_logindex_mem_size)
{
Size size;
logindex_mem_tbl_size = (he3db_logindex_mem_size * 1024L * 1024L) / sizeof(LogIndexMemTBL);
size = offsetof(LogIndexMemList, mem_table); // 去除柔性数组之外的空间大小
size = add_size(size, mul_size(sizeof(LogIndexMemTBL), logindex_mem_tbl_size));
size = MAXALIGN(size);//为了使sizeof(struct)向上对齐成为8的倍数的大小
/* The number of logindex memory table is at least 3 */
if (logindex_mem_tbl_size < 3)
elog(FATAL, "The number=%ld of logindex memory table is less than 3", logindex_mem_tbl_size);
else
ereport(LOG, (errmsg("The total log index memory table size is %ld", size)));
return size;
}
static void SetNewPageItem(LogIndexMemTBL *mem_tbl, const BufferTag *page)
{
// set page item
LogIndexMemItemHead *page_head = &(mem_tbl->page_head[mem_tbl->meta.page_free_head-1]);
SpinLockInit(&(page_head->head_lock));
SpinLockAcquire(&(page_head->head_lock));
memcpy(&(page_head->tag), page, sizeof(BufferTag));
page_head->next_item = LOG_INDEX_TBL_INVALID_SEG;
page_head->next_seg = mem_tbl->meta.lsn_free_head;
page_head->tail_seg = mem_tbl->meta.lsn_free_head;
SpinLockRelease(&(page_head->head_lock));
}
// When active table is full, get next free mem table and will change to active mem.
static LogIndexMemTBL *GetNextFreeMemTbl(void)
{
SpinLockInit(&(log_index_mem_list->lock));
SpinLockAcquire(&(log_index_mem_list->lock));
// Circular List
log_index_mem_list->active_table_index = (log_index_mem_list->active_table_index + 1)%(log_index_mem_list->table_cap);
SpinLockRelease(&(log_index_mem_list->lock));
// if all mem table is full, waiting for recycle
while(log_index_mem_list->active_table_index == log_index_mem_list->table_start_index)
{
pg_usleep(10); /* 10 us */
}
// if it finds free mem table will return directly.
return &(log_index_mem_list->mem_table[log_index_mem_list->active_table_index]);
}
static void SetLsnSeg(LogIndexMemItemSeg *lsn_seg, XLogRecPtr lsn){
LOG_INDEX_INSERT_LSN_INFO(lsn_seg, lsn_seg->number, lsn);
lsn_seg->number++;
}
static void SetNewLsnSeg(LogIndexMemTBL *mem_tbl, XLogRecPtr lsn)
{
// set lsn seg
// first seg index start with 0, seg_item[0]
LogIndexMemItemSeg *lsn_seg = &(mem_tbl->seg_item[mem_tbl->meta.lsn_free_head-1]);
lsn_seg->prev_seg = LOG_INDEX_TBL_INVALID_SEG;
lsn_seg->next_seg = LOG_INDEX_TBL_INVALID_SEG;
SetLsnSeg(lsn_seg, lsn);
}
static void SetNextLsnSeg(LogIndexMemItemHead *page_head, LogIndexMemItemSeg *lsn_seg_old, LogIndexMemTBL *mem_tbl, XLogRecPtr lsn)
{
// set lsn next seg
LogIndexMemItemSeg *lsn_seg_next = &(mem_tbl->seg_item[mem_tbl->meta.lsn_free_head-1]);
lsn_seg_old->next_seg = mem_tbl->meta.lsn_free_head;
lsn_seg_next->prev_seg = page_head->tail_seg;
lsn_seg_next->next_seg = LOG_INDEX_TBL_INVALID_SEG;
page_head->tail_seg = mem_tbl->meta.lsn_free_head;
SetLsnSeg(lsn_seg_next, lsn);
}
static void UpdateMemTableMetaWithNewPage(LogIndexMemTBL *mem_tbl, XLogRecPtr lsn)
{
// set metadata for active mem table
SpinLockInit(&(mem_tbl->meta.meta_lock));
SpinLockAcquire(&(mem_tbl->meta.meta_lock));
// set prefix_lsn, min_lsn and max_lsn
LOG_INDEX_MEM_TBL_SET_PREFIX_LSN(mem_tbl, lsn);
mem_tbl->meta.max_lsn = Max(lsn, mem_tbl->meta.max_lsn);
mem_tbl->meta.min_lsn = Min(lsn, mem_tbl->meta.min_lsn);
// page,lsn free index ++
mem_tbl->meta.page_free_head++;
mem_tbl->meta.lsn_free_head++;
SpinLockRelease(&(mem_tbl->meta.meta_lock));
}
static void UpdateMemTableMetaWithNextPage(LogIndexMemTBL *mem_tbl, XLogRecPtr lsn)
{
// set metadata for active mem table
SpinLockInit(&(mem_tbl->meta.meta_lock));
SpinLockAcquire(&(mem_tbl->meta.meta_lock));
// set prefix_lsn, min_lsn and max_lsn
mem_tbl->meta.max_lsn = Max(lsn, mem_tbl->meta.max_lsn);
mem_tbl->meta.min_lsn = Min(lsn, mem_tbl->meta.min_lsn);
// page,lsn free index ++
mem_tbl->meta.page_free_head++;
mem_tbl->meta.lsn_free_head++;
SpinLockRelease(&(mem_tbl->meta.meta_lock));
}
static void UpdateMemTableMetaWithNextSeg(LogIndexMemTBL *mem_tbl, XLogRecPtr lsn)
{
// set metadata for active mem table
SpinLockInit(&(mem_tbl->meta.meta_lock));
SpinLockAcquire(&(mem_tbl->meta.meta_lock));
mem_tbl->meta.max_lsn = Max(lsn, mem_tbl->meta.max_lsn);
mem_tbl->meta.min_lsn = Min(lsn, mem_tbl->meta.min_lsn);
mem_tbl->meta.lsn_free_head++;
SpinLockRelease(&(mem_tbl->meta.meta_lock));
}
static void UpdateMemTableMetaWithCurrentSeg(LogIndexMemTBL *mem_tbl, XLogRecPtr lsn)
{
// set metadata for active mem table
SpinLockInit(&(mem_tbl->meta.meta_lock));
SpinLockAcquire(&(mem_tbl->meta.meta_lock));
mem_tbl->meta.max_lsn = Max(lsn, mem_tbl->meta.max_lsn);
mem_tbl->meta.min_lsn = Min(lsn, mem_tbl->meta.min_lsn);
SpinLockRelease(&(mem_tbl->meta.meta_lock));
}
static void SetActiveTblWithFirstPage(LogIndexMemTBL *mem_tbl, const BufferTag *page, XLogRecPtr lsn)
{
uint32 hash_key;
// set mem table state to active
pg_atomic_write_u32(&(mem_tbl->meta.state), LOG_INDEX_MEM_TBL_STATE_ACTIVE);
// index start with 1, 0 means INVALID. hash[] all values will be 0 after init, so set to 1 when first use.
mem_tbl->meta.lsn_free_head = 1;
mem_tbl->meta.page_free_head = 1;
// calculate hashcode by buffer tag
hash_key = LOG_INDEX_MEM_TBL_HASH_PAGE(page);
mem_tbl->hash[hash_key] = mem_tbl->meta.page_free_head;
// set page item
SetNewPageItem(mem_tbl, page);
// set lsn seg
SetNewLsnSeg(mem_tbl, lsn);
// set metadata for active mem table
UpdateMemTableMetaWithNewPage(mem_tbl, lsn);
}
static void InsertLsnWhenOldTblIsFull(LogIndexMemTBL *mem_tbl_old, const BufferTag *page, XLogRecPtr lsn)
{
LogIndexMemTBL *mem_tbl_new;
// set mem table state to inactive
pg_atomic_write_u32(&(mem_tbl_old->meta.state), LOG_INDEX_MEM_TBL_STATE_INACTIVE);
mem_tbl_new = GetNextFreeMemTbl();
SetActiveTblWithFirstPage(mem_tbl_new, page, lsn);
}
static void SetNextPageItem(LogIndexMemTBL *mem_tbl, const BufferTag *page, XLogRecPtr lsn)
{
// there's no free page_head or lsn_seg, means current active is full, will apply for new mem table as active table
if (mem_tbl->meta.page_free_head > LOG_INDEX_MEM_TBL_PAGE_NUM || mem_tbl->meta.lsn_free_head > LOG_INDEX_MEM_TBL_SEG_NUM)
{
// no free page head in active mem table, will apply for new mem table
InsertLsnWhenOldTblIsFull(mem_tbl, page, lsn);
}
else
{
// set new page and lsn seg when active mem table have free resource
SetNewPageItem(mem_tbl, page);
SetNewLsnSeg(mem_tbl, lsn);
UpdateMemTableMetaWithNewPage(mem_tbl, lsn);
}
}
static void RestMemTable(LogIndexMemTBL *mem_tbl)
{
// reset table's metadata
mem_tbl->meta.id = LOG_INDEX_TABLE_INVALID_ID;
pg_atomic_write_u32(&(mem_tbl->meta.state), LOG_INDEX_MEM_TBL_STATE_FREE);
mem_tbl->meta.page_free_head = LOG_INDEX_TBL_INVALID_SEG;
mem_tbl->meta.lsn_free_head = LOG_INDEX_TBL_INVALID_SEG;
mem_tbl->meta.min_lsn = UINT64_MAX;
mem_tbl->meta.max_lsn = InvalidXLogRecPtr;
mem_tbl->meta.prefix_lsn = 0;
// reset hash[] and page head[]
for(int i = 0; i < LOG_INDEX_MEM_TBL_PAGE_NUM; i++)
{
mem_tbl->hash[i] = LOG_INDEX_TBL_INVALID_SEG;
CLEAR_BUFFERTAG(mem_tbl->page_head[i].tag);
mem_tbl->page_head[i].next_item = LOG_INDEX_TBL_INVALID_SEG;
mem_tbl->page_head[i].next_seg = LOG_INDEX_TBL_INVALID_SEG;
mem_tbl->page_head[i].tail_seg = LOG_INDEX_TBL_INVALID_SEG;
// reset seg_item[]
mem_tbl->seg_item[i].prev_seg = LOG_INDEX_TBL_INVALID_SEG;
mem_tbl->seg_item[i].next_seg = LOG_INDEX_TBL_INVALID_SEG;
mem_tbl->seg_item[i].number = 0;
}
// reset seg_item[]
for(int i = LOG_INDEX_MEM_TBL_PAGE_NUM; i < LOG_INDEX_MEM_TBL_SEG_NUM; i++){
mem_tbl->seg_item[i].prev_seg = LOG_INDEX_TBL_INVALID_SEG;
mem_tbl->seg_item[i].next_seg = LOG_INDEX_TBL_INVALID_SEG;
mem_tbl->seg_item[i].number = 0;
}
}
static LsnNode *InitLsnNode()
{
LsnNode *head;
head = (LsnNode *)malloc(sizeof(LsnNode));
head->next = NULL;
return head;
}
// insert nodelist from head, eg: before: head-->node1-->NULL, after: head-->newNode-->node1-->NULL
static void InsertLsnNodeByHead(LsnNode *head, XLogRecPtr lsn)
{
LsnNode *new_node;
new_node = (LsnNode *)malloc(sizeof(LsnNode));
new_node->lsn = lsn;
new_node->next = head->next;
head->next = new_node;
}
// print nodelist
static void PrintLsnNode(LsnNode *head)
{
LsnNode *p;
p = head->next;
while (p) {
printf(" %d\t ", p->lsn);
p = p->next;
}
}
static void ReverseLsnNode(LsnNode *head)
{
if (head == NULL || head->next == NULL) {
return;
}
LsnNode *p = NULL;
LsnNode *q = head->next;
LsnNode *next ;
while (q != NULL) {
next = q->next;
q->next = p;
p = q;
q = next;
}
head->next=p;
}
static uint16 FindFirstLsnSegInMemTblByPageTag(LogIndexMemTBL *mem_tbl, const BufferTag *page, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
{
LogIndexMemItemHead *page_head;
uint32 hash_key;
// end_lsn <= min_lsn or start_lsn > max_lsn means the request lsn region not in this mem table
if(mem_tbl->meta.min_lsn >= end_lsn || mem_tbl->meta.max_lsn < start_lsn)
{
return LOG_INDEX_TBL_INVALID_SEG;
}else{
hash_key = LOG_INDEX_MEM_TBL_HASH_PAGE(page);
if(mem_tbl->hash[hash_key] != LOG_INDEX_TBL_INVALID_SEG)
{
page_head = &(mem_tbl->page_head[mem_tbl->hash[hash_key]-1]);
while(!BUFFERTAGS_EQUAL(page_head->tag, *page)){
if(page_head->next_item == LOG_INDEX_TBL_INVALID_SEG)
{
return LOG_INDEX_TBL_INVALID_SEG;
}
page_head = &(mem_tbl->page_head[page_head->next_item-1]);
}
// find request page, return lsn seg
return (page_head->next_seg);
}else
{
return LOG_INDEX_TBL_INVALID_SEG;
}
}
}
void He3dbLogIndexTblListInit(void)
{
bool found_logindex;
log_index_mem_list = (LogIndexMemList *)
ShmemInitStruct("log index", LogIndexMemListSize(he3db_logindex_mem_size), &found_logindex);
Assert(log_index_mem_list != NULL);
log_index_mem_list->table_start_index = 0;
log_index_mem_list->active_table_index = 0;
log_index_mem_list->table_cap = logindex_mem_tbl_size;
for (uint64 i = 0; i < log_index_mem_list->table_cap; i++) {
// set mem table init values
log_index_mem_list->mem_table[i].meta.id = i + 1;
log_index_mem_list->mem_table[i].meta.min_lsn = UINT64_MAX;
log_index_mem_list->mem_table[i].meta.max_lsn = InvalidXLogRecPtr;
pg_atomic_write_u32(&(log_index_mem_list->mem_table[i].meta.state), LOG_INDEX_MEM_TBL_STATE_FREE);
}
}
uint64 GetMemTblSize(void)
{
return log_index_mem_list->table_cap;
}
void InsertLogIndexByPage(const BufferTag *page, XLogRecPtr lsn)
{
LogIndexMemItemSeg *lsn_seg;
uint32 hash_key;
LogIndexMemTBL *mem_tbl;
LogIndexMemItemHead *page_head;
// calculate hashcode by buffer tag
hash_key = LOG_INDEX_MEM_TBL_HASH_PAGE(page);
// get active mem table
mem_tbl = &(log_index_mem_list->mem_table[log_index_mem_list->active_table_index]);
// first time to use active mem table
if(pg_atomic_read_u32(&mem_tbl->meta.state) == LOG_INDEX_MEM_TBL_STATE_FREE)
{
SetActiveTblWithFirstPage(mem_tbl, page, lsn);
}
else
{
// if have same lsn prefix with active table
if(LOG_INDEX_SAME_TABLE_LSN_PREFIX(mem_tbl, lsn))
{
// 0 means INVALID, also means page don't exist in active mem table
if(mem_tbl->hash[hash_key] == 0)
{
// set hash value to next free head
mem_tbl->hash[hash_key] = mem_tbl->meta.page_free_head;
SetNextPageItem(mem_tbl, page, lsn);
}
else
{
// page already exist or hash conflict
// get exist page item
page_head = &(mem_tbl->page_head[mem_tbl->hash[hash_key]-1]);
/* if item page tag equal to current tag, true insert lsn to lsn_seg,
* false loop for next_item until equal or not found one. Then apply new page_item and lsn_seg.
*/
while(!BUFFERTAGS_EQUAL(page_head->tag, *page)){
if(page_head->next_item == LOG_INDEX_TBL_INVALID_SEG)
{
// apply new page item
// there's no free page_head or lsn_seg, means current active is full, will apply for new mem table as active table
if (mem_tbl->meta.page_free_head > LOG_INDEX_MEM_TBL_PAGE_NUM || mem_tbl->meta.lsn_free_head > LOG_INDEX_MEM_TBL_SEG_NUM)
{
// no free page head in active mem table, will apply for new mem table
InsertLsnWhenOldTblIsFull(mem_tbl, page, lsn);
}
else
{
// set new page and lsn seg when active mem table have free resource
// set old page item's next_item to new one.
page_head->next_item = mem_tbl->meta.page_free_head;
// set page item
SetNewPageItem(mem_tbl, page);
SetNewLsnSeg(mem_tbl, lsn);
UpdateMemTableMetaWithNextPage(mem_tbl, lsn);
}
return;
}
page_head = &(mem_tbl->page_head[page_head->next_item-1]);
}
// find same tag's page_head
lsn_seg = &(mem_tbl->seg_item[page_head->tail_seg-1]);
// if current seg full?
if(lsn_seg->number < LOG_INDEX_MEM_ITEM_SEG_LSN_NUM)
{
// insert lsn to seg
SetLsnSeg(lsn_seg, lsn);
UpdateMemTableMetaWithCurrentSeg(mem_tbl, lsn);
}
else
{
if(mem_tbl->meta.lsn_free_head > LOG_INDEX_MEM_TBL_SEG_NUM)
{
// no free page head in active mem table, will apply for new mem table
InsertLsnWhenOldTblIsFull(mem_tbl, page, lsn);
}
else
{
// apply new seg and insert lsn
SetNextLsnSeg(page_head, lsn_seg, mem_tbl, lsn);
UpdateMemTableMetaWithNextSeg(mem_tbl, lsn);
}
}
}
}
else
{
// prefix of lsn is different, so cannot use current active table, will apply new mem table
InsertLsnWhenOldTblIsFull(mem_tbl, page, lsn);
}
}
}
LsnNode *GetLogIndexByPage(const BufferTag *page, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
{
LsnNode *head_node;
uint64 tbl_index;
// Prevent metadata changes during discovery.
SpinLockInit(&(log_index_mem_list->lock));
SpinLockAcquire(&(log_index_mem_list->lock));
head_node = InitLsnNode();
// just one mem table
if(log_index_mem_list->table_start_index == log_index_mem_list->active_table_index)
{
LogIndexMemTBL *mem_tbl = &(log_index_mem_list->mem_table[log_index_mem_list->active_table_index]);
// get index of current table's seg
uint16 seg_index = FindFirstLsnSegInMemTblByPageTag(mem_tbl, page, start_lsn, end_lsn);
while (seg_index != LOG_INDEX_TBL_INVALID_SEG)
{
LogIndexMemItemSeg *item_seg = &(mem_tbl->seg_item[seg_index - 1]);
// loop for lsn list
for(int i=0; i < item_seg->number; i++){
XLogRecPtr lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, item_seg->suffix_lsn[i]);
if(lsn >= start_lsn)
{
if(lsn < end_lsn)
{
InsertLsnNodeByHead(head_node, lsn);
}else{
ReverseLsnNode(head_node);
SpinLockRelease(&(log_index_mem_list->lock));
return head_node;
}
}else
{
continue;
}
}
seg_index = item_seg->next_seg;
}
ReverseLsnNode(head_node);
SpinLockRelease(&(log_index_mem_list->lock));
return head_node;
}
tbl_index = log_index_mem_list->table_start_index;
while(tbl_index != log_index_mem_list->active_table_index)
{
LogIndexMemTBL *mem_tbl = &(log_index_mem_list->mem_table[tbl_index]);
tbl_index = (tbl_index + 1)%(log_index_mem_list->table_cap);
// current mem table no suitability lsn_list
if(mem_tbl->meta.max_lsn < start_lsn)
{
continue;
}else if(mem_tbl->meta.min_lsn > end_lsn)
{
// there is no suitability lsn_list after this mem table
break;
} else
{
// get index of current table's seg
uint16 seg_index = FindFirstLsnSegInMemTblByPageTag(mem_tbl, page, start_lsn, end_lsn);
while (seg_index != LOG_INDEX_TBL_INVALID_SEG)
{
LogIndexMemItemSeg *item_seg = &(mem_tbl->seg_item[seg_index - 1]);
// loop for lsn list
for(int i=0; i < item_seg->number; i++){
XLogRecPtr lsn = LOG_INDEX_COMBINE_LSN(mem_tbl, item_seg->suffix_lsn[i]);
if(lsn >= start_lsn)
{
if(lsn < end_lsn)
{
InsertLsnNodeByHead(head_node, lsn);
}else{
ReverseLsnNode(head_node);
SpinLockRelease(&(log_index_mem_list->lock));
return head_node;
}
}else
{
continue;
}
}
seg_index = item_seg->next_seg;
}
}
}
ReverseLsnNode(head_node);
SpinLockRelease(&(log_index_mem_list->lock));
return head_node;
}
/* cleanup useless mem table which max_lsn less than consist_lsn,
* and reset mem table to reuse.
*/
void CleanLogIndexByPage(XLogRecPtr consist_lsn)
{
SpinLockInit(&(log_index_mem_list->lock));
SpinLockAcquire(&(log_index_mem_list->lock));
// loop mem table from table_start_index
while(log_index_mem_list->table_start_index != log_index_mem_list->active_table_index)
{
LogIndexMemTBL *mem_tbl = &(log_index_mem_list->mem_table[log_index_mem_list->table_start_index]);
// max_lsn large than consistLsn? true: cannot cleanup and reuse just break; false: cleanup
if (mem_tbl->meta.max_lsn >= consist_lsn || pg_atomic_read_u32(&mem_tbl->meta.state) != LOG_INDEX_MEM_TBL_STATE_INACTIVE)
{
break;
}
RestMemTable(mem_tbl);
log_index_mem_list->table_start_index = (log_index_mem_list->table_start_index + 1)%(log_index_mem_list->table_cap);
}
SpinLockRelease(&(log_index_mem_list->lock));
}
Size He3dbLogIndexShmemSize(void)
{
Size size = 0;
if (he3db_logindex_mem_size <= 0)
return size;
size = LogIndexMemListSize(he3db_logindex_mem_size);
return CACHELINEALIGN(size);
}
void FreeLsnNode(LsnNode *head)
{
LsnNode* pb;
while (head != NULL)
{
pb = head;
head = head->next;
free(pb);
pb = NULL;
}
}

View File

@ -205,15 +205,15 @@ mdcreate(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
path = relpath(reln->smgr_rnode, forkNum);
// fd = PathNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY | PG_O_DIRECT);
fd = PathNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
fd = He3DBPathNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY | PG_O_DIRECT);
// fd = He3DBPathNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY | PG_O_DIRECT);
if (fd < 0)
{
int save_errno = errno;
if (isRedo)
fd = He3DBPathNameOpenFile(path, O_RDWR | PG_BINARY | PG_O_DIRECT);
fd = PathNameOpenFile(path, O_RDWR | PG_BINARY);
if (fd < 0)
{
/* be sure to report the error reported by create, not open */
@ -308,11 +308,7 @@ do_truncate(const char *path)
//ret = pg_truncate(path, 0);
if (push_standby)
{
fd = He3DBBasicOpenFilePerm(path, O_RDWR | PG_BINARY, pg_file_create_mode);
if (fd < 0)
return -1;
ret = truncatefs(fd, 0);
closefs(fd);
ret = pg_truncate(path, 0);
}
else
{
@ -469,7 +465,7 @@ mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
if ((nbytes = He3DBFileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
if ((nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_EXTEND)) != BLCKSZ)
{
if (nbytes < 0)
ereport(ERROR,
@ -516,7 +512,7 @@ mdopenfork(SMgrRelation reln, ForkNumber forknum, int behavior)
path = relpath(reln->smgr_rnode, forknum);
/* he3db: He3FS replace OSFS and Use the direct method to open the page file */
fd = He3DBPathNameOpenFile(path, O_RDWR | PG_BINARY | PG_O_DIRECT);
fd = PathNameOpenFile(path, O_RDWR | PG_BINARY);
if (fd < 0)
{
@ -571,7 +567,7 @@ mdclose(SMgrRelation reln, ForkNumber forknum)
{
MdfdVec *v = &reln->md_seg_fds[forknum][nopensegs - 1];
He3DBFileClose(v->mdfd_vfd);
FileClose(v->mdfd_vfd);
_fdvec_resize(reln, forknum, nopensegs - 1);
nopensegs--;
}
@ -659,7 +655,7 @@ mdwriteback(SMgrRelation reln, ForkNumber forknum,
*/
void
mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer, XLogRecPtr lsn)
char *buffer)
{
off_t seekpos;
int nbytes;
@ -680,7 +676,7 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
nbytes = He3DBFileRead(v->mdfd_vfd, &buffer, seekpos, WAIT_EVENT_DATA_FILE_READ, lsn, reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode,segno,forknum);
nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ);
TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum,
reln->smgr_rnode.node.spcNode,
@ -853,7 +849,7 @@ mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
nbytes = He3DBFileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE);
nbytes = FileWrite(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_WRITE);
TRACE_POSTGRESQL_SMGR_MD_WRITE_DONE(forknum, blocknum,
reln->smgr_rnode.node.spcNode,
@ -993,7 +989,7 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
* This segment is no longer active. We truncate the file, but do
* not delete it, for reasons explained in the header comments.
*/
if (He3FileTruncate(v->mdfd_vfd, 0, WAIT_EVENT_DATA_FILE_TRUNCATE,SmgrIsTemp(reln)) < 0)
if (FileTruncate(v->mdfd_vfd, 0, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not truncate file \"%s\": %m",
@ -1005,7 +1001,7 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
/* we never drop the 1st segment */
Assert(v != &reln->md_seg_fds[forknum][0]);
He3DBFileClose(v->mdfd_vfd);
FileClose(v->mdfd_vfd);
_fdvec_resize(reln, forknum, curopensegs - 1);
}
else if (priorblocks + ((BlockNumber) RELSEG_SIZE) > nblocks)
@ -1019,7 +1015,7 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
*/
BlockNumber lastsegblocks = nblocks - priorblocks;
if (He3FileTruncate(v->mdfd_vfd, (off_t) lastsegblocks * BLCKSZ, WAIT_EVENT_DATA_FILE_TRUNCATE,SmgrIsTemp(reln)) < 0)
if (FileTruncate(v->mdfd_vfd, (off_t) lastsegblocks * BLCKSZ, WAIT_EVENT_DATA_FILE_TRUNCATE) < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not truncate file \"%s\" to %u blocks: %m",
@ -1091,7 +1087,7 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
/* Close inactive segments immediately */
if (segno > min_inactive_seg)
{
He3DBFileClose(v->mdfd_vfd);
FileClose(v->mdfd_vfd);
_fdvec_resize(reln, forknum, segno - 1);
}
@ -1287,8 +1283,7 @@ _mdfd_openseg(SMgrRelation reln, ForkNumber forknum, BlockNumber segno,
fullpath = _mdfd_segpath(reln, forknum, segno);
/* open the file */
/* he3db: He3FS replace OSFS and Use the direct method to open the page file */
fd = He3DBPathNameOpenFile(fullpath, O_RDWR | PG_BINARY | oflags | PG_O_DIRECT);
fd = PathNameOpenFile(fullpath, O_RDWR | PG_BINARY | oflags);
pfree(fullpath);
@ -1452,7 +1447,7 @@ _mdnblocks(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
{
off_t len;
len = He3DBFileSize(seg->mdfd_vfd);
len = FileSize(seg->mdfd_vfd);
if (len < 0)
ereport(ERROR,
(errcode_for_file_access(),
@ -1493,8 +1488,8 @@ mdsyncfiletag(const FileTag *ftag, char *path)
pfree(p);
/* He3DB:He3FS replace OSFS */
//file = PathNameOpenFile(path, O_RDWR | PG_BINARY);
file = He3DBPathNameOpenFile(path, O_RDWR | PG_BINARY);
file = PathNameOpenFile(path, O_RDWR | PG_BINARY);
// file = He3DBPathNameOpenFile(path, O_RDWR | PG_BINARY);
if (file < 0)
return -1;
need_to_close = true;
@ -1505,7 +1500,7 @@ mdsyncfiletag(const FileTag *ftag, char *path)
save_errno = errno;
if (need_to_close)
He3DBFileClose(file);
FileClose(file);
errno = save_errno;
return result;

View File

@ -55,7 +55,7 @@ typedef struct f_smgr
bool (*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
int (*smgr_read) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char **buffer, bool onlyPage, XLogRecPtr lsn);
BlockNumber blocknum, char *buffer);
void (*smgr_write) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
@ -78,7 +78,8 @@ static const f_smgr smgrsw[] = {
.smgr_unlink = mdunlink,
.smgr_extend = mdextend,
.smgr_prefetch = mdprefetch,
.smgr_read = he3db_mdread,
// .smgr_read = he3db_mdread,
.smgr_read = mdread,
.smgr_write = mdwrite,
.smgr_writeback = mdwriteback,
.smgr_nblocks = mdnblocks,
@ -471,7 +472,7 @@ smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
//if ((push_standby != true && EnableHotStandby != true) || IsBootstrapProcessingMode() || InitdbSingle) {
smgrsw[reln->smgr_which].smgr_extend(reln, forknum, blocknum,
buffer, skipFsync);
elog(LOG,"smgrextend reln %d,flk %d,blk %d",reln->smgr_rnode.node.relNode,forknum,blocknum);
// elog(LOG,"smgrextend reln %d,flk %d,blk %d",reln->smgr_rnode.node.relNode,forknum,blocknum);
//}
/*
@ -516,9 +517,9 @@ smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
*/
void
smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char **buffer, XLogRecPtr lsn)
char *buffer)
{
smgrsw[reln->smgr_which].smgr_read(reln, forknum, blocknum, buffer, false, lsn);
smgrsw[reln->smgr_which].smgr_read(reln, forknum, blocknum, buffer);
}
/*
@ -527,12 +528,12 @@ smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
* Modified points:
* 1)return read bytes
*/
int
he3dbsmgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char **buffer, XLogRecPtr lsn)
{
return smgrsw[reln->smgr_which].smgr_read(reln, forknum, blocknum, buffer, true, lsn);
}
// int
// he3dbsmgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
// char **buffer, XLogRecPtr lsn)
// {
// return smgrsw[reln->smgr_which].smgr_read(reln, forknum, blocknum, buffer, true, lsn);
// }
/*
* smgrwrite() -- Write the supplied buffer out.
@ -556,7 +557,7 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
//if (push_standby == true || SmgrIsTemp(reln)) {
smgrsw[reln->smgr_which].smgr_write(reln, forknum, blocknum,
buffer, skipFsync);
elog(LOG,"smgrwrite reln %d,flk %d,blk %d",reln->smgr_rnode.node.relNode,forknum,blocknum);
// elog(LOG,"smgrwrite reln %d,flk %d,blk %d",reln->smgr_rnode.node.relNode,forknum,blocknum);
//}
}
@ -572,7 +573,7 @@ smgrwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
//if (push_standby == true || SmgrIsTemp(reln)) {
smgrsw[reln->smgr_which].smgr_writeback(reln, forknum, blocknum,
nblocks);
elog(LOG,"smgrwriteback reln %d,flk %d,blk %d",reln->smgr_rnode.node.relNode,forknum,blocknum);
// elog(LOG,"smgrwriteback reln %d,flk %d,blk %d",reln->smgr_rnode.node.relNode,forknum,blocknum);
//}
}
@ -601,7 +602,7 @@ smgrnblocks(SMgrRelation reln, ForkNumber forknum)
}
result = smgrsw[reln->smgr_which].smgr_nblocks(reln, forknum);
elog(LOG, "===exec lseek ===");
// elog(LOG, "===exec lseek ===");
if (cached_reln == NULL)
SetupRelCache(&reln->smgr_rnode.node, forknum, result);
else

View File

@ -627,6 +627,11 @@ int ssl_renegotiation_limit;
int huge_pages;
int huge_page_size;
/* he3db logindex mem-table size (unit MB), according to this value we can calculate
* the number of mem table.
*/
int he3db_logindex_mem_size;
/*
* These variables are all dummies that don't do anything, except in some
* cases provide the value for SHOW to display. The real state is elsewhere
@ -3561,6 +3566,17 @@ static struct config_int ConfigureNamesInt[] =
check_client_connection_check_interval, NULL, NULL
},
{
{"he3db_logindex_mem_size", PGC_POSTMASTER, RESOURCES_MEM,
gettext_noop("Set the size for logindex memory table"),
NULL,
GUC_UNIT_MB
},
&he3db_logindex_mem_size,
512, 0, INT_MAX / 2,
NULL, NULL, NULL
},
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL

View File

@ -54,7 +54,7 @@ typedef enum RecoveryInitSyncMethod
struct iovec; /* avoid including port/pg_iovec.h here */
typedef int64_t File;
typedef int File;
/* GUC parameter */

View File

@ -0,0 +1,105 @@
#ifndef HE3DB_LOGINDEX_H
#define HE3DB_LOGINDEX_H
#include "access/xlog.h"
#include "common/hashfn.h"
#include "port/atomics.h"
#include "storage/lockdefs.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/s_lock.h"
#include "storage/buf_internals.h"
#define LOG_INDEX_MEM_ITEM_SEG_LSN_NUM 10
#define LOG_INDEX_MEM_TBL_SEG_NUM 4096
#define LOG_INDEX_MEM_TBL_PAGE_NUM (LOG_INDEX_MEM_TBL_SEG_NUM/2)
#define LOG_INDEX_TABLE_INVALID_ID 0
#define LOG_INDEX_TBL_INVALID_SEG 0
#define LOG_INDEX_MEM_TBL_STATE_FREE (0x00)
#define LOG_INDEX_MEM_TBL_STATE_ACTIVE (0x01)
#define LOG_INDEX_MEM_TBL_STATE_INACTIVE (0x02)
#define LOG_INDEX_MEM_TBL_STATE_FLUSHED (0x04)
#define LOG_INDEX_MEM_TBL_HASH_PAGE(tag) \
(tag_hash(tag, sizeof(BufferTag)) % LOG_INDEX_MEM_TBL_PAGE_NUM)
#define LOG_INDEX_SAME_TABLE_LSN_PREFIX(table, lsn) ((table)->meta.prefix_lsn == ((lsn) >> 32))
#define LOG_INDEX_MEM_TBL_SET_PREFIX_LSN(table, lsn) \
{ \
(table)->meta.prefix_lsn = ((lsn) >> 32) ; \
}
#define LOG_INDEX_INSERT_LSN_INFO(lsn_seg, number, lsn) \
{ \
(lsn_seg)->suffix_lsn[(number)] = ((lsn << 32) >> 32); \
}
#define LOG_INDEX_COMBINE_LSN(table, suffix) \
((((XLogRecPtr)((table)->meta.prefix_lsn)) << 32) | (suffix))
// metadata of log index mem table; size:37
typedef struct LogIndexMemMeta
{
uint64 id;
pg_atomic_uint32 state;
uint16 page_free_head; // free location for LogIndexMemItemHead
uint16 lsn_free_head; // free location for LogIndexMemItemSeg
XLogRecPtr min_lsn;
XLogRecPtr max_lsn;
uint32 prefix_lsn;
slock_t meta_lock;
} LogIndexMemMeta;
// log index value, prefix of page head; size: 20+2+2+1=25
typedef struct LogIndexMemItemHead
{
BufferTag tag;
uint16 next_item;
uint16 next_seg;
uint16 tail_seg;
slock_t head_lock;
} LogIndexMemItemHead;
// save page suffix lsn; size: 2+1+4*10=43
typedef struct LogIndexMemItemSeg
{
uint16 prev_seg;
uint16 next_seg;
uint8 number;
uint32 suffix_lsn[LOG_INDEX_MEM_ITEM_SEG_LSN_NUM];
} LogIndexMemItemSeg;
// log index mem table; size: 37+25*2048+43*4096+2*2048=231461≈226kB
typedef struct LogIndexMemTBL
{
LogIndexMemMeta meta;
uint16 hash[LOG_INDEX_MEM_TBL_PAGE_NUM];
LogIndexMemItemHead page_head[LOG_INDEX_MEM_TBL_PAGE_NUM];
LogIndexMemItemSeg seg_item[LOG_INDEX_MEM_TBL_SEG_NUM];
} LogIndexMemTBL;
// list of log index mem tables
typedef struct LogIndexMemList
{
uint64 table_start_index; // first mem_table index, will change by remove unless inactive table
uint64 active_table_index; // current mem_table index
uint64 table_cap;
slock_t lock;
LogIndexMemTBL mem_table[FLEXIBLE_ARRAY_MEMBER];
} LogIndexMemList;
// lsn listNode
typedef struct LsnNode {
XLogRecPtr lsn;
struct LsnNode * next;
} LsnNode;
extern int he3db_logindex_mem_size;
extern Size He3dbLogIndexShmemSize(void);
extern uint64 GetMemTblSize(void);
extern void He3dbLogIndexTblListInit(void);
extern void InsertLogIndexByPage(const BufferTag *page, XLogRecPtr lsn);
extern void CleanLogIndexByPage(XLogRecPtr consistLsn);
extern LsnNode *GetLogIndexByPage(const BufferTag *page, XLogRecPtr start_lsn, XLogRecPtr end_lsn);
extern void FreeLsnNode(LsnNode *head);
#endif /* HE3DB_LOGINDEX_H */

View File

@ -31,7 +31,7 @@ extern void mdextend(SMgrRelation reln, ForkNumber forknum,
extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer, XLogRecPtr lsn);
char *buffer);
extern int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char **buffer, XLogRecPtr lsn);
extern int he3db_mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,

View File

@ -93,7 +93,7 @@ extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
extern void smgrread(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char **buffer, XLogRecPtr lsn);
BlockNumber blocknum, char *buffer);
extern int he3dbsmgrread(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char **buffer, XLogRecPtr lsn);
extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,