diff --git a/cmd/master/main.go b/cmd/master/main.go index e5f1691c51..64597f02bf 100644 --- a/cmd/master/main.go +++ b/cmd/master/main.go @@ -29,27 +29,26 @@ func main() { statsChannel := master.Params.StatsChannels() opt := master.Option{ - KVRootPath: etcdRootPath, - MetaRootPath: etcdRootPath, - EtcdAddr: []string{etcdAddress}, - PulsarAddr: pulsarAddr, - ProxyIDs: master.Params.ProxyIDList(), - PulsarProxyChannels: master.Params.ProxyTimeSyncChannels(), - PulsarProxySubName: master.Params.ProxyTimeSyncSubName(), - SoftTTBInterval: master.Params.SoftTimeTickBarrierInterval(), - WriteIDs: master.Params.WriteIDList(), - PulsarWriteChannels: master.Params.WriteTimeSyncChannels(), - PulsarWriteSubName: master.Params.WriteTimeSyncSubName(), - PulsarDMChannels: master.Params.DMTimeSyncChannels(), - PulsarK2SChannels: master.Params.K2STimeSyncChannels(), - DefaultRecordSize: defaultRecordSize, - MinimumAssignSize: minimumAssignSize, - SegmentThreshold: segmentThreshold, - SegmentThresholdFactor: master.Params.SegmentThresholdFactor(), - SegmentExpireDuration: segmentExpireDuration, - NumOfChannel: numOfChannel, - NumOfQueryNode: nodeNum, - StatsChannels: statsChannel, + KVRootPath: etcdRootPath, + MetaRootPath: etcdRootPath, + EtcdAddr: []string{etcdAddress}, + PulsarAddr: pulsarAddr, + ProxyIDs: master.Params.ProxyIDList(), + PulsarProxyChannels: master.Params.ProxyTimeSyncChannels(), + PulsarProxySubName: master.Params.ProxyTimeSyncSubName(), + SoftTTBInterval: master.Params.SoftTimeTickBarrierInterval(), + WriteIDs: master.Params.WriteIDList(), + PulsarWriteChannels: master.Params.WriteTimeSyncChannels(), + PulsarWriteSubName: master.Params.WriteTimeSyncSubName(), + PulsarDMChannels: master.Params.DMTimeSyncChannels(), + PulsarK2SChannels: master.Params.K2STimeSyncChannels(), + DefaultRecordSize: defaultRecordSize, + MinimumAssignSize: minimumAssignSize, + SegmentThreshold: segmentThreshold, + SegmentExpireDuration: segmentExpireDuration, + NumOfChannel: numOfChannel, + NumOfQueryNode: nodeNum, + StatsChannels: statsChannel, } svr, err := master.CreateServer(ctx, &opt) diff --git a/configs/config.yaml b/configs/config.yaml index 7422ecefc8..87f4075fe0 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -31,7 +31,6 @@ master: minimumAssignSize: 1048576 segmentThreshold: 536870912 segmentExpireDuration: 2000 - segmentThresholdFactor: 0.75 querynodenum: 1 writenodenum: 1 statsChannels: "statistic" diff --git a/internal/master/master.go b/internal/master/master.go index 9eff9ad935..ebf5a69f73 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -48,14 +48,13 @@ type Option struct { PulsarDMChannels []string PulsarK2SChannels []string - DefaultRecordSize int64 - MinimumAssignSize int64 - SegmentThreshold float64 - SegmentThresholdFactor float64 - SegmentExpireDuration int64 - NumOfChannel int - NumOfQueryNode int - StatsChannels string + DefaultRecordSize int64 + MinimumAssignSize int64 + SegmentThreshold float64 + SegmentExpireDuration int64 + NumOfChannel int + NumOfQueryNode int + StatsChannels string } type Master struct { diff --git a/internal/master/paramtable.go b/internal/master/paramtable.go index 78bfbd43cb..efddbb012f 100644 --- a/internal/master/paramtable.go +++ b/internal/master/paramtable.go @@ -214,15 +214,3 @@ func (p *ParamTable) TopicNum() int { } return num } - -func (p *ParamTable) SegmentThresholdFactor() float64 { - factor, err := p.Load("master.segmentThresholdFactor") - if err != nil { - panic(err) - } - res, err := strconv.ParseFloat(factor, 64) - if err != nil { - panic(err) - } - return res -} diff --git a/internal/master/segment_manager.go b/internal/master/segment_manager.go index 79169d7016..0c45405685 100644 --- a/internal/master/segment_manager.go +++ b/internal/master/segment_manager.go @@ -29,21 +29,20 @@ type segmentStatus struct { } type SegmentManager struct { - metaTable *metaTable - statsStream msgstream.MsgStream - channelRanges []*channelRange - segmentStatus map[UniqueID]*segmentStatus // segment id to segment status - collStatus map[UniqueID]*collectionStatus // collection id to collection status - defaultSizePerRecord int64 - minimumAssignSize int64 - segmentThreshold float64 - segmentThresholdFactor float64 - segmentExpireDuration int64 - numOfChannels int - numOfQueryNodes int - globalIDAllocator func() (UniqueID, error) - globalTSOAllocator func() (Timestamp, error) - mu sync.RWMutex + metaTable *metaTable + statsStream msgstream.MsgStream + channelRanges []*channelRange + segmentStatus map[UniqueID]*segmentStatus // segment id to segment status + collStatus map[UniqueID]*collectionStatus // collection id to collection status + defaultSizePerRecord int64 + minimumAssignSize int64 + segmentThreshold int64 + segmentExpireDuration int64 + numOfChannels int + numOfQueryNodes int + globalIDAllocator func() (UniqueID, error) + globalTSOAllocator func() (Timestamp, error) + mu sync.RWMutex } func (segMgr *SegmentManager) HandleQueryNodeMsgPack(msgPack *msgstream.MsgPack) error { @@ -77,7 +76,7 @@ func (segMgr *SegmentManager) handleSegmentStat(segStats *internalpb.SegmentStat segMeta.NumRows = segStats.NumRows segMeta.MemSize = segStats.MemorySize - if segStats.MemorySize > int64(segMgr.segmentThresholdFactor*segMgr.segmentThreshold) { + if segStats.MemorySize > segMgr.segmentThreshold { return segMgr.closeSegment(segMeta) } return segMgr.metaTable.UpdateSegment(segMeta) @@ -151,7 +150,6 @@ func (segMgr *SegmentManager) AssignSegmentID(segIDReq []*internalpb.SegIDReques func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, partitionTag string, count uint32, channelID int32, collStatus *collectionStatus) (*internalpb.SegIDAssignment, error) { - segmentThreshold := int64(segMgr.segmentThreshold) for _, segID := range collStatus.openedSegments { segMeta, _ := segMgr.metaTable.GetSegmentByID(segID) if segMeta.GetCloseTime() != 0 || channelID < segMeta.GetChannelStart() || @@ -162,8 +160,8 @@ func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, pa assignedMem := segMgr.checkAssignedSegExpire(segID) memSize := segMeta.MemSize neededMemSize := segMgr.calNeededSize(memSize, segMeta.NumRows, int64(count)) - if memSize+assignedMem+neededMemSize <= segmentThreshold { - remainingSize := segmentThreshold - memSize - assignedMem + if memSize+assignedMem+neededMemSize <= segMgr.segmentThreshold { + remainingSize := segMgr.segmentThreshold - memSize - assignedMem allocMemSize := segMgr.calAllocMemSize(neededMemSize, remainingSize) segMgr.addAssignment(segID, allocMemSize) return &internalpb.SegIDAssignment{ @@ -176,7 +174,7 @@ func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, pa } } neededMemSize := segMgr.defaultSizePerRecord * int64(count) - if neededMemSize > segmentThreshold { + if neededMemSize > segMgr.segmentThreshold { return nil, errors.Errorf("request with count %d need about %d mem size which is larger than segment threshold", count, neededMemSize) } @@ -186,7 +184,7 @@ func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, pa return nil, err } - allocMemSize := segMgr.calAllocMemSize(neededMemSize, segmentThreshold) + allocMemSize := segMgr.calAllocMemSize(neededMemSize, segMgr.segmentThreshold) segMgr.addAssignment(segMeta.SegmentID, allocMemSize) return &internalpb.SegIDAssignment{ SegID: segMeta.SegmentID, @@ -329,19 +327,18 @@ func NewSegmentManager(meta *metaTable, globalTSOAllocator func() (Timestamp, error), ) *SegmentManager { segMgr := &SegmentManager{ - metaTable: meta, - channelRanges: make([]*channelRange, 0), - segmentStatus: make(map[UniqueID]*segmentStatus), - collStatus: make(map[UniqueID]*collectionStatus), - segmentThreshold: opt.SegmentThreshold, - segmentThresholdFactor: opt.SegmentThresholdFactor, - segmentExpireDuration: opt.SegmentExpireDuration, - minimumAssignSize: opt.MinimumAssignSize, - defaultSizePerRecord: opt.DefaultRecordSize, - numOfChannels: opt.NumOfChannel, - numOfQueryNodes: opt.NumOfQueryNode, - globalIDAllocator: globalIDAllocator, - globalTSOAllocator: globalTSOAllocator, + metaTable: meta, + channelRanges: make([]*channelRange, 0), + segmentStatus: make(map[UniqueID]*segmentStatus), + collStatus: make(map[UniqueID]*collectionStatus), + segmentThreshold: int64(opt.SegmentThreshold), + segmentExpireDuration: opt.SegmentExpireDuration, + minimumAssignSize: opt.MinimumAssignSize, + defaultSizePerRecord: opt.DefaultRecordSize, + numOfChannels: opt.NumOfChannel, + numOfQueryNodes: opt.NumOfQueryNode, + globalIDAllocator: globalIDAllocator, + globalTSOAllocator: globalTSOAllocator, } segMgr.createChannelRanges() return segMgr diff --git a/internal/master/segment_manager_test.go b/internal/master/segment_manager_test.go index faa6264009..f6556c155e 100644 --- a/internal/master/segment_manager_test.go +++ b/internal/master/segment_manager_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/golang/protobuf/proto" + "github.com/gogo/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" @@ -78,13 +78,12 @@ func setup() { panic(err) } opt := &Option{ - SegmentThreshold: 536870912, - SegmentExpireDuration: 2000, - MinimumAssignSize: 1048576, - DefaultRecordSize: 1024, - NumOfQueryNode: 3, - NumOfChannel: 5, - SegmentThresholdFactor: 0.75, + SegmentThreshold: 536870912, + SegmentExpireDuration: 2000, + MinimumAssignSize: 1048576, + DefaultRecordSize: 1024, + NumOfQueryNode: 3, + NumOfChannel: 5, } var cnt int64 @@ -210,7 +209,7 @@ func TestSegmentManager_SegmentStats(t *testing.T) { // close segment stats.SegStats[0].NumRows = 600000 - stats.SegStats[0].MemorySize = int64(0.8 * segMgr.segmentThreshold) + stats.SegStats[0].MemorySize = 600000000 err = segMgr.HandleQueryNodeMsgPack(&msgPack) assert.Nil(t, err) segMeta, _ = mt.GetSegmentByID(100)