Merge branch 'dev_performance' of gitee.com:he3db/he3pg into dev_performance

This commit is contained in:
shenzhengntu 2023-03-24 14:51:43 +08:00
commit ddcb255c66
17 changed files with 715 additions and 904 deletions

View File

@ -55,7 +55,6 @@
#include "port/pg_iovec.h"
#include "postmaster/bgwriter.h"
#include "postmaster/startup.h"
#include "postmaster/relationcleanup.h"
#include "postmaster/walwriter.h"
#include "replication/basebackup.h"
#include "replication/logical.h"
@ -3382,8 +3381,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
INSTR_TIME_SET_CURRENT(start);
pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
// written = pg_pwrite(openLogFile, from, nleft, startoffset);
written = writefs(openLogFile, from, nleft, startoffset);
written = pg_pwrite(openLogFile, from, nleft, startoffset);
pgstat_report_wait_end();
/*
@ -3571,102 +3569,102 @@ void freeItemList(XLogItemList *xlogItemList)
xlogItemList->tail = xlogItemList->head = NULL;
}
static void
He3DBXLogWrite(XLogwrtRqst WriteRqst, bool flexible)
{
LogwrtResult = XLogCtl->LogwrtResult;
char *from;
uint8 part[4];
int count;
int stp;
int xlogLength;
// static void
// He3DBXLogWrite(XLogwrtRqst WriteRqst, bool flexible)
// {
// LogwrtResult = XLogCtl->LogwrtResult;
// char *from;
// uint8 part[4];
// int count;
// int stp;
// int xlogLength;
from = XLogCtl->pages + LogwrtResult.Write % ((XLOGbuffers-1) * XLOG_BLCKSZ);
count = LogwrtResult.Write;
// printf("invoke xlog write, upto %ld\n", WriteRqst.Write);
while (count < WriteRqst.Write)
{
//TODO splite the xlog_buffer and construct k-v pairs
if (count < WriteRqst.Write)
{
if (xlogItemList == NULL)
{
xlogItemList = (XLogItemList *)malloc(sizeof(XLogItemList));
xlogItemList->head = NULL;
xlogItemList->tail = NULL;
}
for(stp = 0; stp < 4; stp ++)
{
part[stp] = *(from + stp);
}
xlogLength = GetXlogLength(part,4);
XLogItem *xlogItem = (XLogItem *)malloc(sizeof(XLogItem));
// from = XLogCtl->pages + LogwrtResult.Write % ((XLOGbuffers-1) * XLOG_BLCKSZ);
// count = LogwrtResult.Write;
// // printf("invoke xlog write, upto %ld\n", WriteRqst.Write);
// while (count < WriteRqst.Write)
// {
// //TODO splite the xlog_buffer and construct k-v pairs
// if (count < WriteRqst.Write)
// {
// if (xlogItemList == NULL)
// {
// xlogItemList = (XLogItemList *)malloc(sizeof(XLogItemList));
// xlogItemList->head = NULL;
// xlogItemList->tail = NULL;
// }
// for(stp = 0; stp < 4; stp ++)
// {
// part[stp] = *(from + stp);
// }
// xlogLength = GetXlogLength(part,4);
// XLogItem *xlogItem = (XLogItem *)malloc(sizeof(XLogItem));
(xlogItem->xlogKey).lsn = count;
xlogItem->begin = from;
// (xlogItem->xlogKey).lsn = count;
// xlogItem->begin = from;
if ((count + xlogLength)/((XLOGbuffers-1) * XLOG_BLCKSZ) > count / ((XLOGbuffers-1) * XLOG_BLCKSZ))
{
from = XLogCtl->pages + ((count + xlogLength)%((XLOGbuffers-1) * XLOG_BLCKSZ));
}
else
{
from = from + xlogLength;
}
// if ((count + xlogLength)/((XLOGbuffers-1) * XLOG_BLCKSZ) > count / ((XLOGbuffers-1) * XLOG_BLCKSZ))
// {
// from = XLogCtl->pages + ((count + xlogLength)%((XLOGbuffers-1) * XLOG_BLCKSZ));
// }
// else
// {
// from = from + xlogLength;
// }
xlogItem->length = xlogLength;
xlogItem->next = NULL;
if (xlogItemList->head == NULL)
{
xlogItemList->head = xlogItem;
xlogItemList->tail = xlogItemList->head;
}
else
{
xlogItemList->tail->next = xlogItem;
xlogItemList->tail = xlogItemList->tail->next;
}
count += xlogLength;
}
}
// xlogItem->length = xlogLength;
// xlogItem->next = NULL;
// if (xlogItemList->head == NULL)
// {
// xlogItemList->head = xlogItem;
// xlogItemList->tail = xlogItemList->head;
// }
// else
// {
// xlogItemList->tail->next = xlogItem;
// xlogItemList->tail = xlogItemList->tail->next;
// }
// count += xlogLength;
// }
// }
//TODO flush into queue
if (xlogItemList != NULL)
{
if (xlogItemList->head != NULL)
{
kvwrite(xlogItemList->head);
freeItemList(xlogItemList);
WalStats.m_wal_write++;
LogwrtResult.Write = WriteRqst.Write;
}
free(xlogItemList);
xlogItemList = NULL;
}
// //TODO flush into queue
// if (xlogItemList != NULL)
// {
// if (xlogItemList->head != NULL)
// {
// kvwrite(xlogItemList->head);
// freeItemList(xlogItemList);
// WalStats.m_wal_write++;
// LogwrtResult.Write = WriteRqst.Write;
// }
// free(xlogItemList);
// xlogItemList = NULL;
// }
if (LogwrtResult.Flush < WriteRqst.Flush &&
LogwrtResult.Flush < LogwrtResult.Write)
{
//TODO signal flush
kvflush(LogwrtResult.Write);
LogwrtResult.Flush = LogwrtResult.Write;
}
// if (LogwrtResult.Flush < WriteRqst.Flush &&
// LogwrtResult.Flush < LogwrtResult.Write)
// {
// //TODO signal flush
// kvflush(LogwrtResult.Write);
// LogwrtResult.Flush = LogwrtResult.Write;
// }
/*
* Update shared-memory status
*/
{
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->LogwrtResult = LogwrtResult;
if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
XLogCtl->globalUpto = 0;
SpinLockRelease(&XLogCtl->info_lck);
}
}
// /*
// * Update shared-memory status
// */
// {
// SpinLockAcquire(&XLogCtl->info_lck);
// XLogCtl->LogwrtResult = LogwrtResult;
// if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
// XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
// if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
// XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
// XLogCtl->globalUpto = 0;
// SpinLockRelease(&XLogCtl->info_lck);
// }
// }
static void
FlushWal(XLogwrtRqst WriteRqst)
@ -4854,8 +4852,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
*/
if (*use_existent)
{
// fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
fd = He3DBBasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
if (fd < 0)
{
if (errno != ENOENT)
@ -5013,8 +5010,7 @@ XLogFileInit(XLogSegNo logsegno, bool *use_existent, bool use_lock)
*use_existent = false;
/* Now open original target segment (might not be file I just made) */
// fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
fd = He3DBBasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
if (fd < 0)
ereport(ERROR,
(errcode_for_file_access(),
@ -5253,8 +5249,7 @@ XLogFileOpen(XLogSegNo segno)
XLogFilePath(path, ThisTimeLineID, segno, wal_segment_size);
// fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
fd = He3DBBasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method));
if (fd < 0)
ereport(PANIC,
(errcode_for_file_access(),
@ -5320,7 +5315,7 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
}
fd = He3DBBasicOpenFile(path, O_RDONLY | PG_BINARY);
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
if (fd >= 0)
{
/* Success! */
@ -5497,8 +5492,7 @@ XLogFileClose(void)
(void)posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
#endif
// if (close(openLogFile) != 0)
if (closefs(openLogFile) != 0)
if (close(openLogFile) != 0)
{
char xlogfname[MAXFNAMELEN];
int save_errno = errno;
@ -5539,7 +5533,7 @@ PreallocXlogFiles(XLogRecPtr endptr)
_logSegNo++;
use_existent = true;
lf = XLogFileInit(_logSegNo, &use_existent, true);
closefs(lf);
close(lf);
if (!use_existent)
CheckpointStats.ckpt_segs_added++;
}
@ -7226,7 +7220,7 @@ exitArchiveRecovery(TimeLineID endTLI, XLogRecPtr endOfLog)
fd = XLogFileInit(startLogSegNo, &use_existent, true);
if (closefs(fd) != 0)
if (close(fd) != 0)
{
char xlogfname[MAXFNAMELEN];
int save_errno = errno;
@ -14306,7 +14300,11 @@ retry:
readOff = targetOff;
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
r = batchRead((uint8_t *) readBuf, ControlFile->checkPointCopy.ThisTimeLineID, targetOff);
bool walStoreToLocal = false;
if (EnableHotStandby && !push_standby)
walStoreToLocal = true;
r = batchRead((uint8_t *) readBuf, ControlFile->checkPointCopy.ThisTimeLineID, targetOff, walStoreToLocal);
pgstat_report_wait_end();
/*

View File

@ -238,29 +238,29 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
// snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
// }
static int
xlogread(int64_t fd, void *buf, off_t offset, size_t size)
{
Bufrd result;
size_t count;
result = readfs(fd, offset, size);
if (result.count <= 0)
return (ssize_t)result.count;
else if (result.count <= XLOG_BLCKSZ)
{
memcpy(buf, result.buf, result.count);
count = result.count;
}
else
{
memcpy(buf, result.buf, XLOG_BLCKSZ);
count = BLCKSZ;
}
// static int
// xlogread(int64_t fd, void *buf, off_t offset, size_t size)
// {
// Bufrd result;
// size_t count;
// result = readfs(fd, offset, size);
// if (result.count <= 0)
// return (ssize_t)result.count;
// else if (result.count <= XLOG_BLCKSZ)
// {
// memcpy(buf, result.buf, result.count);
// count = result.count;
// }
// else
// {
// memcpy(buf, result.buf, XLOG_BLCKSZ);
// count = BLCKSZ;
// }
free_dataRead(result.buf, 1, 1);
return (ssize_t)count;
// free_dataRead(result.buf, 1, 1);
// return (ssize_t)count;
}
// }
/*
* Begin reading WAL at 'RecPtr'.
@ -1431,7 +1431,7 @@ He3DBWALRead(XLogReaderState *state,
pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
#endif
nbytes = batchRead((uint8_t *) buf, state->currTLI, startptr);
nbytes = batchRead((uint8_t *) buf, state->currTLI, startptr, false);
#ifndef FRONTEND
pgstat_report_wait_end();

View File

@ -475,6 +475,7 @@ AuxiliaryProcessMain(int argc, char *argv[])
case CleanLogIndexProcess:
CleanLogIndexMain(0,NULL);
proc_exit(1);
default:
elog(PANIC, "unrecognized process type: %d", (int) MyAuxProcType);
proc_exit(1);

View File

@ -16,7 +16,6 @@ OBJS = \
autovacuum.o \
bgworker.o \
bgwriter.o \
relationcleanup.o \
checkpointer.o \
fork_process.o \
interrupt.o \

View File

@ -115,7 +115,6 @@
#include "postmaster/pgarch.h"
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
#include "postmaster/relationcleanup.h"
#include "replication/logicallauncher.h"
#include "replication/walsender.h"
#include "storage/fd.h"
@ -257,8 +256,7 @@ static pid_t StartupPID = 0,
AutoVacPID = 0,
PgArchPID = 0,
PgStatPID = 0,
SysLoggerPID = 0,
RelationCleanupID = 0;
SysLoggerPID = 0;
/* Startup process's status */
typedef enum
@ -565,7 +563,6 @@ static void ShmemBackendArrayRemove(Backend *bn);
#define StartCheckpointer() StartChildProcess(CheckpointerProcess)
#define StartWalWriter() StartChildProcess(WalWriterProcess)
#define StartWalReceiver() StartChildProcess(WalReceiverProcess)
#define StartRelationCleanup() StartChildProcess(RelationCleanupProcess)
/* Macros to check exit status of a child process */
#define EXIT_STATUS_0(st) ((st) == 0)
#define EXIT_STATUS_1(st) (WIFEXITED(st) && WEXITSTATUS(st) == 1)
@ -1788,10 +1785,6 @@ ServerLoop(void)
if (WalWriterPID == 0 && pmState == PM_RUN)
WalWriterPID = StartWalWriter();
/*if (RelationCleanupID == 0 && pmState == PM_RUN)
RelationCleanupID = StartRelationCleanup();*/
/*
* If we have lost the autovacuum launcher, try to start a new one. We
* don't want autovacuum to run in binary upgrade mode because
@ -2744,10 +2737,6 @@ SIGHUP_handler(SIGNAL_ARGS)
if (PgStatPID != 0)
signal_child(PgStatPID, SIGHUP);
if (RelationCleanupID !=0 )
signal_child(RelationCleanupID, SIGHUP);
/* Reload authentication config files too */
if (!load_hba())
ereport(LOG,
@ -3067,9 +3056,6 @@ reaper(SIGNAL_ARGS)
if (WalWriterPID == 0)
WalWriterPID = StartWalWriter();
// if (RelationCleanupID == 0)
// RelationCleanupID = StartRelationCleanup();
/*
* Likewise, start other special children as needed. In a restart
* situation, some of them may be alive already.
@ -3111,14 +3097,6 @@ reaper(SIGNAL_ARGS)
continue;
}
if (pid == RelationCleanupID)
{
RelationCleanupID = 0;
if (!EXIT_STATUS_0(exitstatus))
HandleChildCrash(pid, exitstatus,
_("relationcleanup process"));
continue;
}
/*
* Was it the checkpointer?
@ -3642,16 +3620,6 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
signal_child(BgWriterPID, (SendStop ? SIGSTOP : SIGQUIT));
}
if (pid == RelationCleanupID)
RelationCleanupID = 0;
else if (RelationCleanupID != 0 && take_action)
{
ereport(DEBUG2,
(errmsg_internal("sending %s to process %d",
(SendStop ? "SIGSTOP" : "SIGQUIT"),
(int) RelationCleanupID)));
signal_child(RelationCleanupID, (SendStop ? SIGSTOP : SIGQUIT));
}
/* Take care of the checkpointer too */
if (pid == CheckpointerPID)
CheckpointerPID = 0;
@ -3877,9 +3845,6 @@ PostmasterStateMachine(void)
signal_child(WalReceiverPID, SIGTERM);
/* checkpointer, archiver, stats, and syslogger may continue for now */
if (RelationCleanupID != 0)
signal_child(RelationCleanupID,SIGTERM);
/* Now transition to PM_WAIT_BACKENDS state to wait for them to die */
pmState = PM_WAIT_BACKENDS;
@ -3906,7 +3871,6 @@ PostmasterStateMachine(void)
StartupPID == 0 &&
WalReceiverPID == 0 &&
BgWriterPID == 0 &&
RelationCleanupID == 0 &&
(CheckpointerPID == 0 ||
(!FatalError && Shutdown < ImmediateShutdown)) &&
WalWriterPID == 0 &&
@ -4001,7 +3965,6 @@ PostmasterStateMachine(void)
Assert(StartupPID == 0);
Assert(WalReceiverPID == 0);
Assert(BgWriterPID == 0);
Assert(RelationCleanupID == 0);
Assert(CheckpointerPID == 0);
Assert(WalWriterPID == 0);
Assert(AutoVacPID == 0);
@ -4218,8 +4181,6 @@ TerminateChildren(int signal)
signal_child(PgArchPID, signal);
if (PgStatPID != 0)
signal_child(PgStatPID, signal);
if (RelationCleanupID != 0)
signal_child(RelationCleanupID, signal);
}
/*
@ -4317,10 +4278,6 @@ BackendStartup(Port *port)
report_fork_failure_to_client(port, save_errno);
return STATUS_ERROR;
}
/*if (!GetReqFlag())
{
SetReqFlag();
}*/
/* in parent, successful fork */
ereport(DEBUG2,
(errmsg_internal("forked new backend, pid=%d socket=%d",
@ -5250,9 +5207,6 @@ sigusr1_handler(SIGNAL_ARGS)
CheckpointerPID = StartCheckpointer();
Assert(BgWriterPID == 0);
BgWriterPID = StartBackgroundWriter();
// Assert(RelationCleanupID == 0);
// RelationCleanupID = StartRelationCleanup();
/*
* Start the archiver if we're responsible for (re-)archiving received
* files.

View File

@ -1,164 +0,0 @@
#include "postgres.h"
#include <limits.h>
#include <signal.h>
#include <unistd.h>
#include <string.h>
#include "miscadmin.h"
#include "access/xlog.h"
#include "access/pushpage.h"
#include "postmaster/interrupt.h"
#include "postmaster/relationcleanup.h"
#include "storage/procsignal.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/s_lock.h"
#include "storage/lock.h"
#include "storage/spin.h"
#include "utils/guc.h"
#include "utils/wait_event.h"
typedef struct CleanupInfo
{
XLogRecPtr applyLSN;
int ReqFlag;
} CleanupInfo;
static CleanupInfo *cleanupInfo = NULL;
static int ReqFlag;
void RelationCleanupMain(void)
{
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGINT, SIG_IGN);
pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGCHLD, SIG_DFL);
cleanupInfo->ReqFlag = 0;
cleanupInfo->applyLSN = 0;
for (;;)
{
if ((cleanupInfo->ReqFlag == 1) || push_standby)
{
cleanupInfo->applyLSN = QueryReplyLsn(InvalidXLogRecPtr);
}
// usleep(10L);
}
}
void Cleanup()
{
//TODO cut the lists and cleanup shmem
}
Size RelCutShmemSize(void)
{
return (Size)sizeof(CleanupInfo);
}
void InitCleanupInfo()
{
bool found;
/*
* Get or create the shared strategy control block
*/
cleanupInfo = (CleanupInfo *)
ShmemInitStruct("PrefixList cut Status",
sizeof(CleanupInfo),
&found);
if (!found)
{
cleanupInfo->applyLSN = 0;
}
}
XLogRecPtr getApplyLSN()
{
return cleanupInfo->applyLSN;
}
int GetReqFlag()
{
return cleanupInfo->ReqFlag;
}
void SetReqFlag()
{
cleanupInfo->ReqFlag = 1;
}
/*
void CutWalList(WalList *wl)
{
int size, low, high, mid;
XLogRecPtr applyLSN, lsn, firstLSN;
double rate;
SpinLockAcquire(&(wl->append_lck));
rate = (double)wl->len / wl->cap;
while (rate > 0.9)
{
if (localApplyLSN >= getApplyLSN())
{
SpinLockRelease(&(wl->append_lck));
while (localApplyLSN >= getApplyLSN())
{
usleep(5L);
}
SpinLockAcquire(&(wl->append_lck));
if ((double)wl->len / wl->cap < 0.9)
{
break;
}
}
firstLSN = ((WalLoc *)(wl->wals))->Lsn;
if (localApplyLSN < firstLSN)
{
continue;
}
size = wl->len;// / sizeof(WalLoc);
low = 0;
high = size - 1;
mid = -1;
while (low <= high)
{
mid = (low + high) / 2;
lsn = ((WalLoc *)(wl->wals + sizeof(WalLoc) * mid))->Lsn;
if (lsn > localApplyLSN)
{
high = mid - 1;
}
else if (lsn < localApplyLSN)
{
low = mid + 1;
}
else
{
break;
}
}
memcpy(wl->wals, (char *)(wl->wals + (mid + 1) * sizeof(WalLoc)), (wl->len - (mid + 1)) * sizeof(WalLoc));
wl->len = wl->len - (mid + 1); //* sizeof(WalLoc);
rate = (double)wl->len / wl->cap;
}
SpinLockRelease(&(wl->append_lck));
}
*/

View File

@ -611,7 +611,7 @@ WalReceiverMain(void)
XLogWalRcvFlush(false);
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
if (closefs(recvFile) != 0)
if (close(recvFile) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not close log segment %s: %m",
@ -930,8 +930,8 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
/* OK to write the logs */
errno = 0;
// byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
byteswritten = writefs(recvFile, buf, segbytes, (off_t) startoff);
byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
// byteswritten = writefs(recvFile, buf, segbytes, (off_t) startoff);
if (byteswritten <= 0)
{

View File

@ -20,7 +20,7 @@
* is using it.
*
* ReleaseBuffer() -- unpin a buffer
*
*f
* MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
* The disk write is delayed until buffer replacement or checkpoint.
*
@ -49,6 +49,7 @@
#include "storage/proc.h"
#include "storage/smgr.h"
#include "storage/standby.h"
#include "storage/md.h"
#include "utils/memdebug.h"
#include "utils/ps_status.h"
#include "utils/rel.h"
@ -856,11 +857,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
bool isExtend;
bool isLocalBuf = SmgrIsTemp(smgr);
/* he3db: local tem buffer for pageXlog */
// char *pageXlogBuf;
char *pageXlogBuf;
int nbytes;
*hit = false;
// pageXlogBuf = NULL;
pageXlogBuf = NULL;
/* Make sure we will have room to remember the buffer pin */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
@ -1052,15 +1053,17 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
if (!((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
!isLocalBuf) && IsBootstrapProcessingMode() != true && InitdbSingle != true)
{
BufferTag pageTag;
pageTag.rnode = smgr->smgr_rnode.node;
pageTag.forkNum = forkNum;
pageTag.blockNum = blockNum;
replayLsn = GetXLogReplayRecPtr(&tli);
XLogRecPtr pageLsn = BufferGetLSN(bufHdr);
head = GetLogIndexByPage(&pageTag,pageLsn,replayLsn);
GetXLogReplayRecPtr(&tli);
tWalRecord = ReadWalsByPage(pageTag.rnode.dbNode,pageTag.rnode.relNode,forkNum,blockNum,tli,head);
if (EnableHotStandby == true || InRecovery) {
BufferTag pageTag;
pageTag.rnode = smgr->smgr_rnode.node;
pageTag.forkNum = forkNum;
pageTag.blockNum = blockNum;
replayLsn = GetXLogReplayRecPtr(&tli);
XLogRecPtr pageLsn = BufferGetLSN(bufHdr);
head = GetLogIndexByPage(&pageTag,pageLsn,replayLsn);
GetXLogReplayRecPtr(&tli);
tWalRecord = ReadWalsByPage(pageTag.rnode.dbNode,pageTag.rnode.relNode,forkNum,blockNum,tli,head);
}
}
/*
@ -1086,23 +1089,31 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
if (track_io_timing)
INSTR_TIME_SET_CURRENT(io_start);
if (exist == true && IsBootstrapProcessingMode() != true && InitdbSingle != true){
BufferTag pageTag;
pageTag.rnode = smgr->smgr_rnode.node;
pageTag.forkNum = forkNum;
pageTag.blockNum = blockNum;
replayLsn = GetXLogReplayRecPtr(&tli);
XLogRecPtr pageLsn = BufferGetLSN(bufHdr);
head = GetLogIndexByPage(&pageTag,pageLsn,replayLsn);
GetXLogReplayRecPtr(&tli);
tWalRecord = ReadWalsByPage(pageTag.rnode.dbNode,pageTag.rnode.relNode,forkNum,blockNum,tli,head);
} else {
nbytes = he3dbsmgrread(smgr, forkNum, blockNum, &pageXlogPtr);
if (nbytes < BLCKSZ) {
elog(FATAL,"smgrextend=>he3dbsmgrread rel %d flk %d blk %d nbytes %d",smgr->smgr_rnode.node.relNode,forkNum, blockNum,nbytes);
if (EnableHotStandby == true || InRecovery) {
if (IsBootstrapProcessingMode() == true || InitdbSingle == true) {
smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
} else {
memcpy(bufBlock,pageXlogPtr,BLCKSZ);
replayLsn = GetXLogReplayRecPtr(&tli);
if (exist == true) {
BufferTag pageTag;
pageTag.rnode = smgr->smgr_rnode.node;
pageTag.forkNum = forkNum;
pageTag.blockNum = blockNum;
XLogRecPtr pageLsn = BufferGetLSN(bufHdr);
head = GetLogIndexByPage(&pageTag,pageLsn,replayLsn);
GetXLogReplayRecPtr(&tli);
tWalRecord = ReadWalsByPage(pageTag.rnode.dbNode,pageTag.rnode.relNode,forkNum,blockNum,tli,head);
} else {
nbytes = he3db_mdread(smgr, forkNum, blockNum, &pageXlogPtr,true, replayLsn);
if (nbytes < BLCKSZ) {
elog(FATAL,"smgrextend=>he3dbsmgrread rel %d flk %d blk %d nbytes %d",smgr->smgr_rnode.node.relNode,forkNum, blockNum,nbytes);
} else {
memcpy(bufBlock,pageXlogPtr,BLCKSZ);
}
}
}
} else {
smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
}
if (track_io_timing)
@ -1125,6 +1136,12 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
blockNum,
relpath(smgr->smgr_rnode, forkNum))));
MemSet((char *) bufBlock, 0, BLCKSZ);
if(pageXlogBuf != NULL)
{
free(pageXlogBuf);
pageXlogBuf = NULL;
}
}
else
ereport(ERROR,
@ -1164,8 +1181,6 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
else
{
//todo: read related wals in standby instance.
/*
* He3DB: page-replay.
*
@ -1499,8 +1514,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
// master/slave/push standby need to flush dirty page to release space
FlushBuffer(buf, NULL);
// } else {
// He3DBFlushBuffer(buf, NULL);
// He3DBFlushBuffer(buf, NULL);
// }
LWLockRelease(BufferDescriptorGetContentLock(buf));
@ -3275,11 +3289,22 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
/*
* bufToWrite is either the shared buffer or a copy, as appropriate.
*/
smgrwrite(reln,
buf->tag.forkNum,
buf->tag.blockNum,
bufToWrite,
false);
// smgrwrite(reln,
// buf->tag.forkNum,
// buf->tag.blockNum,
// bufToWrite,
// false);
PageKey pageKey;
pageKey.relfileNode.dbNode = buf->tag.rnode.dbNode;;
pageKey.relfileNode.relNode = buf->tag.rnode.relNode;
pageKey.relfileNode.spcNode = buf->tag.rnode.spcNode;
pageKey.blkNo = buf->tag.blockNum;
pageKey.forkNo = buf->tag.forkNum;
pageKey.pageLsn = BufferGetLSN(buf);
//将page放到本地盘
EvictOnePageOutOfMemory(pageKey, (char *)BufHdrGetBlock(buf));
if (track_io_timing)
{

File diff suppressed because it is too large Load Diff

View File

@ -4953,55 +4953,4 @@ FindFSMetaInTable(const FSKey *fsk)
hash_search(FSMetaHash, fsk, HASH_FIND, &found);
return fsm;
}
void CutWalList(WalList *wl)
{
int size, low, high, mid;
XLogRecPtr applyLSN, lsn, firstLSN;
double rate;
SpinLockAcquire(&(wl->append_lck));
rate = (double)wl->len / wl->cap;
while (rate > 0.9)
{
if (localApplyLSN < getApplyLSN())
localApplyLSN = getApplyLSN();
firstLSN = ((WalLoc *)(wl->wals))->Lsn;
if (localApplyLSN < firstLSN)
{
continue;
}
size = wl->len;// / sizeof(WalLoc);
low = 0;
high = size - 1;
mid = -1;
while (low <= high)
{
mid = (low + high) / 2;
lsn = ((WalLoc *)(wl->wals + sizeof(WalLoc) * mid))->Lsn;
if (lsn > localApplyLSN)
{
high = mid - 1;
}
else if (lsn < localApplyLSN)
{
low = mid + 1;
}
else
{
break;
}
}
memcpy(wl->wals, (char *)(wl->wals + (mid + 1) * sizeof(WalLoc)), (wl->len - (mid + 1)) * sizeof(WalLoc));
wl->len = wl->len - (mid + 1); //* sizeof(WalLoc);
rate = (double)wl->len / wl->cap;
}
SpinLockRelease(&(wl->append_lck));
}

View File

@ -675,7 +675,13 @@ mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ);
//TODO read page from disk
nbytes = MasterFileRead(buffer,reln->smgr_rnode.node.dbNode,reln->smgr_rnode.node.relNode,forknum,blocknum);
if (nbytes == 0)
{
nbytes = FileRead(v->mdfd_vfd, buffer, BLCKSZ, seekpos, WAIT_EVENT_DATA_FILE_READ);
}
TRACE_POSTGRESQL_SMGR_MD_READ_DONE(forknum, blocknum,
reln->smgr_rnode.node.spcNode,

View File

@ -24,6 +24,7 @@
#include "storage/md.h"
#include "storage/smgr.h"
#include "storage/filecache.h"
#include "utils/hfs.h"
#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/guc.h"
@ -446,6 +447,9 @@ smgrdounlinkall(SMgrRelation *rels, int nrels, bool isRedo)
for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
smgrsw[which].smgr_unlink(rnodes[i], forknum, isRedo);
//remove unused pages and related wals in localdisk cache.
RemoveBufferFromLocal(rnodes[i].node.dbNode, rnodes[i].node.relNode, MAX_FORKNUM, 0);
}
pfree(rnodes);
@ -674,6 +678,8 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
smgrsw[reln->smgr_which].smgr_truncate(reln, forknum[i], nblocks[i]);
//remove unused pages and related wals in localdisk cache.
RemoveBufferFromLocal(reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode, forknum[i], nblocks[i]);
/*
* We might as well update the local smgr_cached_nblocks values. The
* smgr cache inval message that this function sent will cause other
@ -737,6 +743,9 @@ smgrtruncatelsn(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber
}
smgrsw[reln->smgr_which].smgr_truncate(reln, forknum[i], nblocks[i]);
//remove unused pages and related wals in localdisk cache.
RemoveBufferFromLocal(reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode, forknum[i], nblocks[i]);
/*
* We might as well update the local smgr_cached_nblocks values. The
* smgr cache inval message that this function sent will cause other

View File

@ -279,12 +279,15 @@ GetBackendTypeDesc(BackendType backendType)
case B_PARALLEL_FLUSH:
backendDesc = "parallel flush";
break;
<<<<<<< HEAD
case B_CLEAN_LOGINDEX:
backendDesc = "clean logindex";
break;
/*case B_RELATIONCLEANUP:
backendDesc = "relation cleanup";
break;*/
=======
>>>>>>> 859cb43019a91590e91ebe381861f9fe3f6b94a2
}
return backendDesc;

View File

@ -1,13 +0,0 @@
#include "postgres.h"
#include "access/xlogdefs.h"
#include "storage/lock.h"
extern void RelationCleanupMain(void);
extern Size RelCutShmemSize(void);
extern void InitCleanupInfo();
extern XLogRecPtr getApplyLSN();
// extern void CutWalList(WalList *wl);
extern void SetReqFlag();
extern int GetReqFlag();

View File

@ -86,22 +86,23 @@ extern int max_safe_fds;
/* Operations on virtual Files --- equivalent to Unix kernel file ops */
extern File PathNameOpenFile(const char *fileName, int fileFlags);
extern File He3DBPathNameOpenFile(const char *fileName, int fileFlags);
//extern File He3DBPathNameOpenFile(const char *fileName, int fileFlags);
extern File PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
extern File He3DBPathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
//extern File He3DBPathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
extern File OpenTemporaryFile(bool interXact);
extern void FileClose(File file);
extern void He3DBFileClose(File file);
//extern void He3DBFileClose(File file);
extern int FilePrefetch(File file, off_t offset, int amount, uint32 wait_event_info);
extern int FileRead(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info);
extern int MasterFileRead(char *buffer, uint32_t dbid, uint32_t relid, uint32_t forkno, uint32_t blockno);
extern int He3DBFileRead(File file, char **buffer, off_t offset, uint32 wait_event_info, XLogRecPtr lsn, uint32_t dbid, uint32_t relid, uint32_t segno, uint32_t forkno);
extern int FileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info);
extern int He3DBFileWrite(File file, char *buffer, int amount, off_t offset, uint32 wait_event_info);
extern int FileSync(File file, uint32 wait_event_info);
extern off_t FileSize(File file);
extern off_t He3DBFileSize(File file);
//extern off_t He3DBFileSize(File file);
extern int FileTruncate(File file, off_t offset, uint32 wait_event_info);
extern int He3FileTruncate(File file, off_t offset, uint32 wait_event_info,bool isTemp);
//extern int He3FileTruncate(File file, off_t offset, uint32 wait_event_info,bool isTemp);
extern void FileWriteback(File file, off_t offset, off_t nbytes, uint32 wait_event_info);
extern char *FilePathName(File file);
extern int FileGetRawDesc(File file);
@ -140,7 +141,7 @@ extern int CloseTransientFile(int fd);
extern int BasicOpenFile(const char *fileName, int fileFlags);
extern int64_t He3DBBasicOpenFile(const char *fileName, int fileFlags);
extern int BasicOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
extern int64_t He3DBBasicOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
//extern int64_t He3DBBasicOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
/* Use these for other cases, and also for long-lived BasicOpenFile FDs */
extern bool AcquireExternalFD(void);
@ -185,9 +186,9 @@ extern void SyncDataDirectory(void);
extern int data_sync_elevel(int elevel);
/* He3DB: He3FS */
extern ssize_t he3fs_pread(int64_t fd, void **buf, off_t offset, XLogRecPtr lsn, uint16 type, uint32_t dbid, uint32_t relid, uint32_t segno, uint32_t forkno);
extern ssize_t he3fs_pwrite(int64_t fd, const void *buf, size_t size, off_t offset);
extern ssize_t he3fs_xlogread(int64_t fd, void *buf, off_t Offset, size_t size);
//extern ssize_t he3fs_pread(int64_t fd, void **buf, off_t offset, XLogRecPtr lsn, uint16 type, uint32_t dbid, uint32_t relid, uint32_t segno, uint32_t forkno);//
//extern ssize_t he3fs_pwrite(int64_t fd, const void *buf, size_t size, off_t offset);
//extern ssize_t he3fs_xlogread(int64_t fd, void *buf, off_t Offset, size_t size);
/* Filename components */
#define PG_TEMP_FILES_DIR "pgsql_tmp"

View File

@ -660,7 +660,6 @@ extern void InitDeadLockChecking(void);
extern int LockWaiterCount(const LOCKTAG *locktag);
extern WalList *SetupWalLOGInTable(const Prefix *wallog);
extern WalList *FindWalLOGInTable(const Prefix *wallog);
extern void CutWalList(WalList *wl);
#ifdef LOCK_DEBUG
extern void DumpLocks(PGPROC *proc);

View File

@ -30,6 +30,15 @@ typedef struct XLogItem
struct XLogItem *next;
} XLogItem;
typedef struct PageKey
{
RelFileNode relfileNode;
uint32 forkNo;
uint32 blkNo;
uint64 pageLsn;
uint64 replyLsn;
} PageKey;
extern IOResult openfs(const char *pathname, int flags);
@ -52,7 +61,7 @@ extern Bufrd dataRead(int64_t fd,
extern void free_dataRead(uint8_t *buf, size_t count, size_t cap);
extern Bufrd readfs(int64_t fd, int64_t offset, uint32_t size);
extern int batchRead(uint8_t *buf, uint32_t timeline, uint64_t startPtr);
extern int batchRead(uint8_t *buf, uint32_t timeline, uint64_t startPtr, bool needStore);
extern uint8_t kvwrite(XLogItem *xlogItem);
extern uint8_t flushwals(XLogItem *xlogItem, uint32_t timeline);
extern uint8_t kvflush(XLogRecPtr lsn);
@ -64,4 +73,8 @@ extern Bufrd ReadWalsByPage(uint32_t dbid,
LsnNode* head);
extern void InsertConsistToKV(uint64_t lsn);
extern uint64_t GetConsistLsn(uint64_t lsn);
extern void DelConsistLsns(uint64_t lsn);
extern void DelConsistLsns(uint64_t lsn);
extern uint8_t EvictOnePageOutOfMemory(PageKey pageKey, char *value);
extern Bufrd MoveOnePageToMemory(PageKey pageKey);
extern Bufrd GetWalsFromDisk(PageKey pageKey);
extern void RemoveBufferFromLocal(uint32_t dbid, uint32_t relid, uint32_t forkno, uint32_t blkno);