mirror of
https://gitee.com/he3db/he3pg.git
synced 2024-11-29 18:58:35 +08:00
!164 fix: #I7BZYU
Merge pull request !164 from shipixian/dev_performance
This commit is contained in:
commit
a225a5affd
@ -398,7 +398,7 @@ pg_truncate_visibility_map(PG_FUNCTION_ARGS)
|
|||||||
if (BlockNumberIsValid(block))
|
if (BlockNumberIsValid(block))
|
||||||
{
|
{
|
||||||
fork = VISIBILITYMAP_FORKNUM;
|
fork = VISIBILITYMAP_FORKNUM;
|
||||||
smgrtruncate(rel->rd_smgr, &fork, 1, &block);
|
smgrtruncatelsn(rel->rd_smgr, &fork, 1, &block);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (RelationNeedsWAL(rel))
|
if (RelationNeedsWAL(rel))
|
||||||
|
@ -5808,7 +5808,7 @@ static void
|
|||||||
xact_redo_commit(xl_xact_parsed_commit *parsed,
|
xact_redo_commit(xl_xact_parsed_commit *parsed,
|
||||||
TransactionId xid,
|
TransactionId xid,
|
||||||
XLogRecPtr lsn,
|
XLogRecPtr lsn,
|
||||||
RepOriginId origin_id)
|
RepOriginId origin_id, XLogRecPtr startlsn)
|
||||||
{
|
{
|
||||||
TransactionId max_xid;
|
TransactionId max_xid;
|
||||||
TimestampTz commit_time;
|
TimestampTz commit_time;
|
||||||
@ -5916,6 +5916,15 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
|
|||||||
if (push_standby)
|
if (push_standby)
|
||||||
{
|
{
|
||||||
pushTikv(0, hashMapSize(), true);
|
pushTikv(0, hashMapSize(), true);
|
||||||
|
} else {
|
||||||
|
XLogRecPtr consistPtr;
|
||||||
|
consistPtr = GetXLogPushToDisk();
|
||||||
|
while (consistPtr < startlsn)
|
||||||
|
{
|
||||||
|
pg_usleep(100000L);
|
||||||
|
elog(LOG, "standby consist lsn %ld, commit lsn %ld", consistPtr, startlsn);
|
||||||
|
consistPtr = GetXLogPushToDisk();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Make sure files supposed to be dropped are dropped */
|
/* Make sure files supposed to be dropped are dropped */
|
||||||
@ -5957,7 +5966,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
|
|||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
|
xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
|
||||||
XLogRecPtr lsn, RepOriginId origin_id)
|
XLogRecPtr lsn, RepOriginId origin_id, XLogRecPtr startlsn)
|
||||||
{
|
{
|
||||||
TransactionId max_xid;
|
TransactionId max_xid;
|
||||||
|
|
||||||
@ -6025,6 +6034,15 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
|
|||||||
if (push_standby)
|
if (push_standby)
|
||||||
{
|
{
|
||||||
pushTikv(0, hashMapSize(), true);
|
pushTikv(0, hashMapSize(), true);
|
||||||
|
} else {
|
||||||
|
XLogRecPtr consistPtr;
|
||||||
|
consistPtr = GetXLogPushToDisk();
|
||||||
|
while (consistPtr < startlsn)
|
||||||
|
{
|
||||||
|
pg_usleep(100000L);
|
||||||
|
elog(LOG, "standby consist lsn %ld, abort lsn %ld", consistPtr, startlsn);
|
||||||
|
consistPtr = GetXLogPushToDisk();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DropRelationFiles(parsed->xnodes, parsed->nrels, true);
|
DropRelationFiles(parsed->xnodes, parsed->nrels, true);
|
||||||
@ -6046,7 +6064,7 @@ xact_redo(XLogReaderState *record)
|
|||||||
|
|
||||||
ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
|
ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
|
||||||
xact_redo_commit(&parsed, XLogRecGetXid(record),
|
xact_redo_commit(&parsed, XLogRecGetXid(record),
|
||||||
record->EndRecPtr, XLogRecGetOrigin(record));
|
record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr);
|
||||||
}
|
}
|
||||||
else if (info == XLOG_XACT_COMMIT_PREPARED)
|
else if (info == XLOG_XACT_COMMIT_PREPARED)
|
||||||
{
|
{
|
||||||
@ -6055,7 +6073,7 @@ xact_redo(XLogReaderState *record)
|
|||||||
|
|
||||||
ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
|
ParseCommitRecord(XLogRecGetInfo(record), xlrec, &parsed);
|
||||||
xact_redo_commit(&parsed, parsed.twophase_xid,
|
xact_redo_commit(&parsed, parsed.twophase_xid,
|
||||||
record->EndRecPtr, XLogRecGetOrigin(record));
|
record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr);
|
||||||
|
|
||||||
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
|
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
|
||||||
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
|
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
|
||||||
@ -6069,7 +6087,7 @@ xact_redo(XLogReaderState *record)
|
|||||||
|
|
||||||
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
|
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
|
||||||
xact_redo_abort(&parsed, XLogRecGetXid(record),
|
xact_redo_abort(&parsed, XLogRecGetXid(record),
|
||||||
record->EndRecPtr, XLogRecGetOrigin(record));
|
record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr);
|
||||||
}
|
}
|
||||||
else if (info == XLOG_XACT_ABORT_PREPARED)
|
else if (info == XLOG_XACT_ABORT_PREPARED)
|
||||||
{
|
{
|
||||||
@ -6078,7 +6096,7 @@ xact_redo(XLogReaderState *record)
|
|||||||
|
|
||||||
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
|
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
|
||||||
xact_redo_abort(&parsed, parsed.twophase_xid,
|
xact_redo_abort(&parsed, parsed.twophase_xid,
|
||||||
record->EndRecPtr, XLogRecGetOrigin(record));
|
record->EndRecPtr, XLogRecGetOrigin(record), record->ReadRecPtr);
|
||||||
|
|
||||||
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
|
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
|
||||||
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
|
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
|
||||||
|
@ -9330,6 +9330,20 @@ void StartupXLOG(void)
|
|||||||
if (rmgrId == RM_SMGR_ID && info == XLOG_SMGR_TRUNCATE) {
|
if (rmgrId == RM_SMGR_ID && info == XLOG_SMGR_TRUNCATE) {
|
||||||
pushTikv(0,hashMapSize(),true);
|
pushTikv(0,hashMapSize(),true);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
RmgrId rmgrId = XLogRecGetRmid(xlogreader);
|
||||||
|
uint8 info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
|
||||||
|
if (rmgrId == RM_SMGR_ID && info == XLOG_SMGR_TRUNCATE) {
|
||||||
|
XLogRecPtr consistPtr, startlsn;
|
||||||
|
consistPtr = GetXLogPushToDisk();
|
||||||
|
startlsn = record->xl_end - record->xl_tot_len;
|
||||||
|
while (consistPtr < startlsn)
|
||||||
|
{
|
||||||
|
pg_usleep(100000L);
|
||||||
|
elog(LOG, "standby consist lsn %ld, truncate lsn %ld", consistPtr, startlsn);
|
||||||
|
consistPtr = GetXLogPushToDisk();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
RmgrTable[record->xl_rmid].rm_redo(xlogreader);
|
RmgrTable[record->xl_rmid].rm_redo(xlogreader);
|
||||||
hasReplay = true;
|
hasReplay = true;
|
||||||
|
@ -1172,6 +1172,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
bufHdr->pageIsVaild = true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
|
smgrread(smgr, forkNum, blockNum, (char *) bufBlock);
|
||||||
@ -1476,8 +1477,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
|||||||
* If we get here, previous attempts to read the buffer must
|
* If we get here, previous attempts to read the buffer must
|
||||||
* have failed ... but we shall bravely try again.
|
* have failed ... but we shall bravely try again.
|
||||||
*/
|
*/
|
||||||
|
if (buf->pageIsVaild == false) {
|
||||||
|
*exist = false;
|
||||||
|
} else {
|
||||||
|
*exist = true;
|
||||||
|
}
|
||||||
*foundPtr = false;
|
*foundPtr = false;
|
||||||
*exist = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1696,6 +1701,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
|||||||
* If we get here, previous attempts to read the buffer
|
* If we get here, previous attempts to read the buffer
|
||||||
* must have failed ... but we shall bravely try again.
|
* must have failed ... but we shall bravely try again.
|
||||||
*/
|
*/
|
||||||
|
if (buf->pageIsVaild == false) {
|
||||||
|
*exist = false;
|
||||||
|
} else {
|
||||||
|
*exist = true;
|
||||||
|
}
|
||||||
*foundPtr = false;
|
*foundPtr = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1748,7 +1758,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
|||||||
buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
|
buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
|
||||||
else
|
else
|
||||||
buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
|
buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
|
||||||
|
buf->pageIsVaild = false;
|
||||||
UnlockBufHdr(buf, buf_state);
|
UnlockBufHdr(buf, buf_state);
|
||||||
|
|
||||||
if (oldPartitionLock != NULL)
|
if (oldPartitionLock != NULL)
|
||||||
@ -1766,8 +1776,14 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
|
|||||||
* to read it before we did, so there's nothing left for BufferAlloc() to
|
* to read it before we did, so there's nothing left for BufferAlloc() to
|
||||||
* do.
|
* do.
|
||||||
*/
|
*/
|
||||||
if (StartBufferIO(buf, true))
|
if (StartBufferIO(buf, true)) {
|
||||||
|
if (buf->pageIsVaild == false) {
|
||||||
|
*exist = false;
|
||||||
|
} else {
|
||||||
|
*exist = true;
|
||||||
|
}
|
||||||
*foundPtr = false;
|
*foundPtr = false;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
*foundPtr = true;
|
*foundPtr = true;
|
||||||
|
|
||||||
|
@ -773,7 +773,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
|
|||||||
pageKey.relfileNode.relNode = pageTag.rnode.relNode;
|
pageKey.relfileNode.relNode = pageTag.rnode.relNode;
|
||||||
pageKey.forkNo = (uint32)pageTag.forkNum;
|
pageKey.forkNo = (uint32)pageTag.forkNum;
|
||||||
pageKey.blkNo = pageTag.blockNum;
|
pageKey.blkNo = pageTag.blockNum;
|
||||||
pageKey.pageLsn = 0;
|
pageKey.pageLsn = GetXLogPushToDisk();
|
||||||
pageKey.replyLsn = lsn;
|
pageKey.replyLsn = lsn;
|
||||||
|
|
||||||
odpk.pk = pageKey;
|
odpk.pk = pageKey;
|
||||||
@ -827,7 +827,7 @@ int he3db_mdread_pagexlog(SMgrRelation reln, ForkNumber forknum, BlockNumber blo
|
|||||||
return nbytes;
|
return nbytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
pageKey.pageLsn = PageGetLSN(*buffer);
|
pageKey.pageLsn = GetXLogPushToDisk();
|
||||||
|
|
||||||
pageKey.replyLsn = lsn;
|
pageKey.replyLsn = lsn;
|
||||||
|
|
||||||
|
@ -197,6 +197,7 @@ typedef struct BufferDesc
|
|||||||
int wait_backend_pid; /* backend PID of pin-count waiter */
|
int wait_backend_pid; /* backend PID of pin-count waiter */
|
||||||
int freeNext; /* link in freelist chain */
|
int freeNext; /* link in freelist chain */
|
||||||
LWLock content_lock; /* to lock access to buffer contents */
|
LWLock content_lock; /* to lock access to buffer contents */
|
||||||
|
bool pageIsVaild;
|
||||||
} BufferDesc;
|
} BufferDesc;
|
||||||
extern BufferDesc **bulk_io_in_progress_buf;
|
extern BufferDesc **bulk_io_in_progress_buf;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user