milvus/internal/datanode/collection_replica.go
sunby 189ac881f3 Fix bugs (#5676)
* Remove redundant session startup

Signed-off-by: sunby <bingyi.sun@zilliz.com>

* Register datanode after start success

Signed-off-by: sunby <bingyi.sun@zilliz.com>

* fix meta snap shot

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* fix datanode message stream channel

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

* Fix bugs when drop empty collection

Signed-off-by: sunby <bingyi.sun@zilliz.com>

* Fix bug of getting pchan statistics from task scheduler

Signed-off-by: dragondriver <jiquan.long@zilliz.com>

* Fix i/dist/dataservice test code

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* Fix epoch lifetime not applied

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* fix datanode flowgraph dd node

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

* Fix handle datanode timetick bug

Signed-off-by: sunby <bingyi.sun@zilliz.com>

* Remove repack function of dml stream

Signed-off-by: dragondriver <jiquan.long@zilliz.com>

* fix proxynode

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* Apply extended seal policy

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* add check for time tick

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* fix check

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* Fix the repack function of dml stream

Signed-off-by: dragondriver <jiquan.long@zilliz.com>

* Fix the bug when send statistics of pchan

Signed-off-by: dragondriver <jiquan.long@zilliz.com>

* Fix the repack function when craete dml stream

Signed-off-by: dragondriver <jiquan.long@zilliz.com>

* fix bugs

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* fix describe collection

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* Fix bug when send timestamp statistics

Signed-off-by: dragondriver <jiquan.long@zilliz.com>

* fix data node

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* Add length check before flush request

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* add log for data node

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* Fix SaveBinlog bugs

Signed-off-by: sunby <bingyi.sun@zilliz.com>

* Add more log in datanode

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

* Put SegmentState.Flushing as the last one in enum to fit the client

Signed-off-by: sunby <bingyi.sun@zilliz.com>

* Fix params in GetInsertBinlogPaths

Signed-off-by: sunby <bingyi.sun@zilliz.com>

* Rename policy

Signed-off-by: sunby <bingyi.sun@zilliz.com>

* Remove unused ddl functions and fields

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* Remove pchan when drop collection

Signed-off-by: dragondriver <jiquan.long@zilliz.com>

* Add balanced assignment policy

Signed-off-by: sunby <bingyi.sun@zilliz.com>

* fix master ut

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* Add lock in session manager

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* add log for debug

Signed-off-by: yefu.chen <yefu.chen@zilliz.com>

* Fix some logic bug and typo

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* Fix recover bugs

Signed-off-by: sunby <bingyi.sun@zilliz.com>

* Get collection scheme of a specific timestamp

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

* Change CheckPoint to SegmentInfo in VchannelInfo

Signed-off-by: sunby <bingyi.sun@zilliz.com>

* Recover Unflushed segment numOfRows

Signed-off-by: yangxuan <xuan.yang@zilliz.com>

* Fix dataservice unit tests

Signed-off-by: sunby <bingyi.sun@zilliz.com>

Co-authored-by: yefu.chen <yefu.chen@zilliz.com>
Co-authored-by: yangxuan <xuan.yang@zilliz.com>
Co-authored-by: dragondriver <jiquan.long@zilliz.com>
Co-authored-by: Congqi Xia <congqi.xia@zilliz.com>
2021-06-15 16:06:11 +08:00

323 lines
9.9 KiB
Go

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datanode
import (
"context"
"fmt"
"sync"
"sync/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
)
type Replica interface {
init(initTs Timestamp) error
getCollectionByID(collectionID UniqueID, ts Timestamp) (*Collection, error)
hasCollection(collectionID UniqueID) bool
// segment
addSegment(segmentID, collID, partitionID UniqueID, channelName string) error
removeSegment(segmentID UniqueID) error
hasSegment(segmentID UniqueID) bool
updateStatistics(segmentID UniqueID, numRows int64) error
getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
getSegmentByID(segmentID UniqueID) (*Segment, error)
bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error
getBufferPaths(segID UniqueID) (map[UniqueID][]string, error)
getChannelName(segID UniqueID) (string, error)
setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error
setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error
getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition)
listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64)
}
// Segment is the data structure of segments in data node replica.
type Segment struct {
segmentID UniqueID
collectionID UniqueID
partitionID UniqueID
numRows int64
memorySize int64
isNew atomic.Value // bool
channelName string
field2Paths map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered.
}
// CollectionSegmentReplica is the data replication of persistent data in datanode.
// It implements `Replica` interface.
type CollectionSegmentReplica struct {
mu sync.RWMutex
collection *Collection
segments map[UniqueID]*Segment
metaService *metaService
posMu sync.Mutex
startPositions map[UniqueID][]*internalpb.MsgPosition
endPositions map[UniqueID][]*internalpb.MsgPosition
}
var _ Replica = &CollectionSegmentReplica{}
func newReplica(ms types.MasterService, collectionID UniqueID) Replica {
metaService := newMetaService(ms, collectionID)
segments := make(map[UniqueID]*Segment)
var replica Replica = &CollectionSegmentReplica{
segments: segments,
collection: &Collection{id: collectionID},
metaService: metaService,
startPositions: make(map[UniqueID][]*internalpb.MsgPosition),
endPositions: make(map[UniqueID][]*internalpb.MsgPosition),
}
return replica
}
func (replica *CollectionSegmentReplica) init(initTs Timestamp) error {
log.Debug("Initing replica ...")
ctx := context.Background()
schema, err := replica.metaService.getCollectionSchema(ctx, replica.collection.GetID(), initTs)
if err != nil {
log.Error("Replica init fail", zap.Error(err))
return err
}
replica.collection.schema = schema
return nil
}
func (replica *CollectionSegmentReplica) getChannelName(segID UniqueID) (string, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
seg, ok := replica.segments[segID]
if !ok {
return "", fmt.Errorf("Cannot find segment, id = %v", segID)
}
return seg.channelName, nil
}
// bufferAutoFlushBinlogPaths buffers binlog paths generated by auto-flush
func (replica *CollectionSegmentReplica) bufferAutoFlushBinlogPaths(segID UniqueID, field2Path map[UniqueID]string) error {
replica.mu.RLock()
defer replica.mu.RUnlock()
seg, ok := replica.segments[segID]
if !ok {
return fmt.Errorf("Cannot find segment, id = %v", segID)
}
for fieldID, path := range field2Path {
buffpaths, ok := seg.field2Paths[fieldID]
if !ok {
buffpaths = make([]string, 0)
}
buffpaths = append(buffpaths, path)
seg.field2Paths[fieldID] = buffpaths
}
log.Info("Buffer auto flush binlog paths", zap.Int64("segment ID", segID))
return nil
}
func (replica *CollectionSegmentReplica) getBufferPaths(segID UniqueID) (map[UniqueID][]string, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
if seg, ok := replica.segments[segID]; ok {
return seg.field2Paths, nil
}
return nil, fmt.Errorf("Cannot find segment, id = %v", segID)
}
func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
if seg, ok := replica.segments[segmentID]; ok {
return seg, nil
}
return nil, fmt.Errorf("Cannot find segment, id = %v", segmentID)
}
// `addSegment` add a new segment into replica when data node see the segment
func (replica *CollectionSegmentReplica) addSegment(
segmentID UniqueID,
collID UniqueID,
partitionID UniqueID,
channelName string) error {
replica.mu.Lock()
defer replica.mu.Unlock()
log.Debug("Add Segment", zap.Int64("Segment ID", segmentID))
seg := &Segment{
segmentID: segmentID,
collectionID: collID,
partitionID: partitionID,
channelName: channelName,
field2Paths: make(map[UniqueID][]string),
}
seg.isNew.Store(true)
replica.segments[segmentID] = seg
return nil
}
func (replica *CollectionSegmentReplica) removeSegment(segmentID UniqueID) error {
replica.mu.Lock()
delete(replica.segments, segmentID)
replica.mu.Unlock()
replica.posMu.Lock()
delete(replica.startPositions, segmentID)
delete(replica.endPositions, segmentID)
replica.posMu.Unlock()
return nil
}
func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool {
replica.mu.RLock()
defer replica.mu.RUnlock()
_, ok := replica.segments[segmentID]
return ok
}
// `updateStatistics` updates the number of rows of a segment in replica.
func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error {
replica.mu.Lock()
defer replica.mu.Unlock()
if seg, ok := replica.segments[segmentID]; ok {
log.Debug("updating segment", zap.Int64("Segment ID", segmentID), zap.Int64("numRows", numRows))
seg.memorySize = 0
seg.numRows += numRows
return nil
}
return fmt.Errorf("There's no segment %v", segmentID)
}
// `getSegmentStatisticsUpdates` gives current segment's statistics updates.
func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
replica.mu.Lock()
defer replica.mu.Unlock()
if seg, ok := replica.segments[segmentID]; ok {
updates := &internalpb.SegmentStatisticsUpdates{
SegmentID: segmentID,
MemorySize: seg.memorySize,
NumRows: seg.numRows,
}
return updates, nil
}
return nil, fmt.Errorf("Error, there's no segment %v", segmentID)
}
// --- collection ---
// getCollectionByID will get collection schema from masterservice if not exist.
// If you want the latest collection schema, ts should be 0
func (replica *CollectionSegmentReplica) getCollectionByID(collectionID UniqueID, ts Timestamp) (*Collection, error) {
replica.mu.Lock()
defer replica.mu.Unlock()
if collectionID != replica.collection.GetID() {
return nil, fmt.Errorf("Not supported collection %v", collectionID)
}
if replica.collection.GetSchema() == nil {
sch, err := replica.metaService.getCollectionSchema(context.Background(), collectionID, ts)
if err != nil {
return nil, err
}
replica.collection.schema = sch
}
return replica.collection, nil
}
func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bool {
replica.mu.RLock()
defer replica.mu.RUnlock()
if replica.collection != nil &&
collectionID == replica.collection.GetID() &&
replica.collection.schema != nil {
return true
}
return false
}
// getSegmentsCheckpoints get current open segments checkpoints
func (replica *CollectionSegmentReplica) getSegmentsCheckpoints() {
replica.mu.RLock()
//for segID, segment := range replica.segments {
// if segment
//}
replica.mu.RUnlock()
}
// setStartPositions set segment `Start Position` - means the `startPositions` from the MsgPack when segment is first found
func (replica *CollectionSegmentReplica) setStartPositions(segID UniqueID, startPositions []*internalpb.MsgPosition) error {
replica.posMu.Lock()
defer replica.posMu.Unlock()
replica.startPositions[segID] = startPositions
return nil
}
// setEndPositions set segment `End Position` - means the `endPositions` from the MsgPack when segment need to be flushed
func (replica *CollectionSegmentReplica) setEndPositions(segID UniqueID, endPositions []*internalpb.MsgPosition) error {
replica.posMu.Lock()
defer replica.posMu.Unlock()
replica.endPositions[segID] = endPositions
return nil
}
// getSegmentPositions returns stored segment start-end Positions
// To te Noted: start/end positions are NOT start&end position from one single MsgPack, they are from different MsgPack!
// see setStartPositions, setEndPositions comment
func (replica *CollectionSegmentReplica) getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) {
replica.posMu.Lock()
defer replica.posMu.Unlock()
startPos := replica.startPositions[segID]
endPos := replica.endPositions[segID]
return startPos, endPos
}
func (replica *CollectionSegmentReplica) listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) {
replica.posMu.Lock()
defer replica.posMu.Unlock()
r1 := make(map[UniqueID]internalpb.MsgPosition)
r2 := make(map[UniqueID]int64)
for _, seg := range segs {
r1[seg] = *replica.endPositions[seg][0]
r2[seg] = replica.segments[seg].numRows
}
return r1, r2
}