Merge branch 'dev_performance' of gitee.com:he3db/he3pg into dev_performance

This commit is contained in:
shenzhengntu 2023-06-14 21:09:04 +08:00
commit 78a90efe61
9 changed files with 151 additions and 13 deletions

View File

@ -0,0 +1,76 @@
#!/bin/bash
export PATH=/home/postgres/psql14/bin:$PATH
primaryDataDir=/home/postgres/primary/pgdata
primaryImdbPageDirectory=/tmp/primarypagedb
primaryImdbWalDirectory=/tmp/primarywaldb
primaryLogfile=/home/postgres/primarylogfile
primaryPort=15432
primaryConninfo='application_name=pushstandby user=repl password=123456 host=127.0.0.1 port=15432 sslmode=disable sslcompression=0 gssencmode=disable target_session_attrs=any'
pushDataDir=/home/postgres/push/pgdata
pushImdbPageDirectory=/tmp/pushpagedb
pushImdbWalDirectory=/tmp/pushwaldb
pushLogfile=/home/postgres/pushlogfile
if [ ! -d "$primaryDataDir" ]; then
echo "$primaryDataDir does not exist!"
exit 1
fi
if [ "`ls -A $primaryDataDir`" != "" ]; then
echo "$primaryDataDir is not enpty!"
exit 1
fi
pg_ctl -D $pushDataDir -l $pushLogfile stop
if [ $? -ne 0 ]
then
echo "$(date "+%F %T"): He3DB stop failed!"
exit 1
fi
sed -i 's/^primary_conninfo/#primary_conninfo/g' $pushDataDir/postgresql.auto.conf
sed -i 's/^primary_conninfo/#primary_conninfo/g' $pushDataDir/postgresql.conf
sed -i 's/^he3mirror/#he3mirror/g' $pushDataDir/postgresql.conf
rsync -av --exclude base --exclude global --exclude standby.signal --exclude backup_label.old --exclude backup_manifest $pushDataDir/* $primaryDataDir/
if [ $? -ne 0 ]
then
echo "$(date "+%F %T"): sync data file failed!"
exit 1
fi
ln -s $pushDataDir/base $primaryDataDir/base
ln -s $pushDataDir/global $primaryDataDir/global
echo -e "primary_conninfo = '$primaryConninfo'" >> $pushDataDir/postgresql.conf
echo -e "he3mirror=false" >> $pushDataDir/postgresql.conf
echo -e "mpush=on" >> $pushDataDir/postgresql.conf
sed -i 's/^push_standby/#push_standby/g' $primaryDataDir/postgresql.conf
sed -i 's/^hot_standby/#hot_standby/g' $primaryDataDir/postgresql.conf
sed -i 's/^port/#port/g' $primaryDataDir/postgresql.conf
sed -i 's/^lmdb_page_directory/#lmdb_page_directory/g' $primaryDataDir/postgresql.conf
sed -i 's/^lmdb_wal_directory/#lmdb_wal_directory/g' $primaryDataDir/postgresql.conf
echo -e "push_standby=off" >> $primaryDataDir/postgresql.conf
echo -e "hot_standby=off" >> $primaryDataDir/postgresql.conf
echo -e "port=$primaryPort" >> $primaryDataDir/postgresql.conf
echo -e "he3mirror=false" >> $primaryDataDir/postgresql.conf
echo -e "lmdb_page_directory='$primaryImdbPageDirectory'" >> $primaryDataDir/postgresql.conf
echo -e "lmdb_wal_directory='$primaryImdbWalDirectory'" >> $primaryDataDir/postgresql.conf
rm -rf $primaryImdbPageDirectory $primaryImdbWalDirectory $pushImdbPageDirectory $pushImdbWalDirectory
pg_ctl -D $primaryDataDir -l $primaryLogfile start
if [ $? -ne 0 ]
then
echo "$(date "+%F %T"): He3DB primary instance start failed!"
exit 1
fi
pg_ctl -D $pushDataDir -l $pushLogfile start
if [ $? -ne 0 ]
then
echo "$(date "+%F %T"): He3DB push instance start failed!"
exit 1
fi

View File

@ -47,7 +47,16 @@ pid_t startupPid = 0;
// return true;
// }
bool ReConnectPrimaryDB(void) {
if (push_standby == true && pushconn!=NULL) {
PQfinish(pushconn);
pushconn = NULL;
if (ConnectPrimaryDB() == true) {
return true;
}
}
return false;
}
static bool ConnectPrimaryDB() {
char *err;

View File

@ -5819,8 +5819,17 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
if (errormsg)
ereport(emode_for_corrupt_record(emode, EndRecPtr),
(errmsg_internal("%s", errormsg) /* already translated */));
} else {
if (xlogreader->ReadRecPtr > xlogreader->currRecPtr) {
elog(LOG, "read record ReadRecPtr %X gt currRecPtr %X, need clean wals which ge ReadRecPtr.",
xlogreader->ReadRecPtr, xlogreader->currRecPtr);
DelRangeWals(ThisTimeLineID, xlogreader->ReadRecPtr, PG_UINT64_MAX);
WalTaskImmediateFree();
return NULL;
}
}
/*
* Check page TLI is one of the expected values.
*/
@ -14582,7 +14591,10 @@ consumerXLogBatchRead(XLogReaderState *xlogreader, XLogRecPtr startPtr, int reqL
}
continue;
} else {
elog(FATAL,"stream startPtr %X gt FirstData.startLsn %X",FirstData->startLsn,startPtr);
elog(LOG, "stream FirstData.startLsn %X gt startPtr %X, need clean wals which ge startPtr.",
FirstData->startLsn, startPtr);
DelRangeWals(ThisTimeLineID, FirstData->startLsn, PG_UINT64_MAX);
WalTaskImmediateFree();
return -1;
}
} else {

View File

@ -1346,3 +1346,9 @@ FirstCallSinceLastCheckpoint(void)
return FirstCall;
}
pid_t
He3DBQueryCkpPid(void)
{
return CheckpointerShmem->checkpointer_pid;
}

View File

@ -3156,7 +3156,7 @@ reaper(SIGNAL_ARGS)
*/
SignalChildren(SIGUSR2);
pmState = PM_SHUTDOWN_2;
// pmState = PM_SHUTDOWN_2;
/*
* We can also shut down the stats collector now; there's
@ -3192,13 +3192,24 @@ reaper(SIGNAL_ARGS)
continue;
}
// if (pid == SecondBufferPID)
// {
// SecondBufferPID = 0;
// if (!EXIT_STATUS_0(exitstatus))
// HandleChildCrash(pid, exitstatus,
// _("second buffer process"));
// }
if (pid == SecondBufferPID)
{
SecondBufferPID = 0;
if (EXIT_STATUS_0(exitstatus) && pmState == PM_SHUTDOWN)
{
Assert(Shutdown > NoShutdown);
pmState = PM_SHUTDOWN_2;
}
else
{
/*
* Any unexpected exit of the checkpointer (including FATAL
* exit) is treated as a crash.
*/
HandleChildCrash(pid, exitstatus,
_("second buffer process"));
}
}
/*
* Was it the wal receiver? If exit status is zero (normal) or one

View File

@ -1132,13 +1132,36 @@ MovePageFromSecondBufferToLocalBuffer()
LWLock *partitionLock = NULL;
uint32 newHash;
bool canShutDown = false;
pid_t ckp_pid;
for (;;)
{
// exit when postmaster stop
ResetLatch(MyLatch);
if (ShutdownRequestPending)
proc_exit(0);
{
if (canShutDown)
proc_exit(0);
ckp_pid = He3DBQueryCkpPid();
if (ckp_pid == 0)
canShutDown = true;
if (kill(ckp_pid, 0) == -1)
{
if (errno == ESRCH)
{
elog(LOG, "checkpoint process is shutdown, we can shutdown secondbuffer process after flush all buffer into lmdb");
canShutDown = true;
}
else
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("could not check the existence of the backend with PID %d: %m",
ckp_pid)));
}
}
for (i = 0; i < SDNUM; i ++)
{

View File

@ -822,7 +822,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
nbytes, BLCKSZ)));
}
if (push_standby)
if (push_standby || !EnableHotStandby || *isPromoteIsTriggered)
{
return nbytes;
}

View File

@ -803,7 +803,7 @@ smgrtruncatelsn(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber
sleep(1);
minApplyLsn = He3DBQueryMinLsnFromAllStanby();
printf("====pushlsn=%lx==lsn==%lx==\n",minApplyLsn,lsn);
} while(minApplyLsn!=InvalidXLogRecPtr && minApplyLsn<lsn);
} while(minApplyLsn<lsn);
flag = true;
}
}

View File

@ -41,5 +41,6 @@ extern Size CheckpointerShmemSize(void);
extern void CheckpointerShmemInit(void);
extern bool FirstCallSinceLastCheckpoint(void);
extern pid_t He3DBQueryCkpPid(void);
#endif /* _BGWRITER_H */