milvus/docs/developer_guides/chap07_query_coordinator.md
ryjiang f4e984bc2d
[skip ci]Add comments in chap07_query_coordinator.md (#12051)
Signed-off-by: ruiyi.jiang <ruiyi.jiang@zilliz.com>
2021-11-18 22:17:57 +08:00

12 KiB

7. Query Coordinator

7.1 Overview

7.2 Query Coordinator Interface

type QueryCoord interface {
	Component
	TimeTickProvider

  // ShowCollections notifies RootCoord to list all collection names and other info in database at specified timestamp
	ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error)
  // LoadCollection notifies Proxy to load a collection's data
	LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error)
  // ReleaseCollection notifies Proxy to release a collection's data
	ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error)
  // ShowPartitions notifies RootCoord to list all partition names and other info in the collection
	ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error)
  // LoadPartitions notifies Proxy to load partition's data
	LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error)
  // ReleasePartitions notifies Proxy to release collection's data
	ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error)
  // CreateQueryChannel creates the channels for querying in QueryCoord.
	CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error)
	GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error)
  // GetSegmentInfo requests segment info
	GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error)
  // GetMetrics gets the metrics about QueryCoord.
	GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
}
  • MsgBase
type MsgBase struct {
	MsgType MsgType
	MsgID	UniqueID
	Timestamp Timestamp
	SourceID UniqueID
}
  • ShowCollections
type ShowCollectionRequest struct {
	Base          *commonpb.MsgBase
	DbID          UniqueID
	CollectionIDs []int64
}

type ShowCollectionResponse struct {
	Status              *commonpb.Status
	CollectionIDs       []UniqueID
	InMemoryPercentages []int64
}
  • LoadCollection
type LoadCollectionRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	schema       *schemapb.CollectionSchema
}
  • ReleaseCollection
type ReleaseCollectionRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
}
  • ShowPartitions
type ShowPartitionRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	PartitionIDs []int64
}

type ShowPartitionResponse struct {
	Status              *commonpb.Status
	PartitionIDs        []UniqueID
	InMemoryPercentages []int64
}
  • GetPartitionStates
type PartitionState = int

const (
	PartitionState_NotExist        PartitionState = 0
	PartitionState_NotPresent      PartitionState = 1
	PartitionState_OnDisk          PartitionState = 2
	PartitionState_PartialInMemory PartitionState = 3
	PartitionState_InMemory        PartitionState = 4
	PartitionState_PartialInGPU    PartitionState = 5
	PartitionState_InGPU           PartitionState = 6
)

type PartitionStatesRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	PartitionIDs []UniqueID
}

type PartitionStates struct {
	PartitionID UniqueID
	State       PartitionState
}

type PartitionStatesResponse struct {
	Status                *commonpb.Status
	PartitionDescriptions []*PartitionStates
}
  • LoadPartitions
type LoadPartitonRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	PartitionIDs []UniqueID
	Schema       *schemapb.CollectionSchema
}
  • ReleasePartitions
type ReleasePartitionRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	PartitionIDs []UniqueID
}
  • CreateQueryChannel
type CreateQueryChannelResponse struct {
	Status             *commonpb.Status
	RequestChannelName string
	ResultChannelName  string
}
  • GetSegmentInfo *
type GetSegmentInfoRequest struct {
	Base       *commonpb.MsgBase
	SegmentIDs []UniqueID
}

type SegmentInfo struct {
	SegmentID    UniqueID
	CollectionID UniqueID
	PartitionID  UniqueID
	MemSize      UniqueID
	NumRows      UniqueID
	IndexName    string
	IndexID      UniqueID
}

type GetSegmentInfoResponse struct {
	Status *commonpb.Status
	Infos  []*SegmentInfo
}

7.3 Query Channel

  • SearchMsg
type SearchRequest struct {
	Base               *commonpb.MsgBase
	ResultChannelID    string
	DbID               int64
	CollectionID       int64
	PartitionIDs       []int64
	Dsl                string
	PlaceholderGroup   []byte
	DslType            commonpb.DslType
	SerializedExprPlan []byte
	OutputFieldsId     []int64
	TravelTimestamp    uint64
	GuaranteeTimestamp uint64
}

type SearchMsg struct {
	BaseMsg
	SearchRequest
}
  • RetrieveMsg
type RetrieveRequest struct {
	Base               *commonpb.MsgBase
	ResultChannelID    string
	DbID               int64
	CollectionID       int64
	PartitionIDs       []int64
	SerializedExprPlan []byte
	OutputFieldsId     []int64
	TravelTimestamp    uint64
	GuaranteeTimestamp uint64
}

type RetrieveMsg struct {
	BaseMsg
	RetrieveRequest
}

7.4 Query Node Interface

type QueryNode interface {
	Component
	TimeTickProvider

	// AddQueryChannel notifies QueryNode to subscribe a query channel and be a producer of a query result channel.
	AddQueryChannel(ctx context.Context, req *querypb.AddQueryChannelRequest) (*commonpb.Status, error)
  // RemoveQueryChannel removes the query channel for QueryNode component.
	RemoveQueryChannel(ctx context.Context, req *querypb.RemoveQueryChannelRequest) (*commonpb.Status, error)
  // WatchDmChannels watches the channels about data manipulation.
	WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error)
	// LoadSegments notifies QueryNode to load the sealed segments from storage. The load tasks are sync to this
	// rpc, QueryNode will return after all the sealed segments are loaded.
	LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error)
	// ReleaseCollection notifies Proxy to release a collection's data
	ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error)
	// ReleasePartitions notifies Proxy to release partitions' data
	ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error)
	// ReleaseSegments releases the data of the specified segments in QueryNode.
	ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error)
  // GetSegmentInfo requests segment info
	GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error)
  // GetMetrics gets the metrics about QueryNode.
	GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest, opts ...grpc.CallOption) (*milvuspb.GetMetricsResponse, error)
}
  • AddQueryChannel
type AddQueryChannelRequest struct {
	Base             *commonpb.MsgBase
	NodeID           int64
	CollectionID     int64
	RequestChannelID string
	ResultChannelID  string
}
  • RemoveQueryChannel
type RemoveQueryChannelRequest struct {
	Base             *commonpb.MsgBase
	NodeID           int64
	CollectionID     int64
	RequestChannelID string
	ResultChannelID  string
}
  • WatchDmChannels

type WatchDmChannelsRequest struct {
	Base         *commonpb.MsgBase
	NodeID       int64
	CollectionID int64
	PartitionID  int64
	Infos        []*datapb.VchannelInfo
	Schema       *schemapb.CollectionSchema
	ExcludeInfos []*datapb.SegmentInfo
}
  • LoadSegments
type LoadSegmentsRequest struct {
	Base          *commonpb.MsgBase
	NodeID        int64
	Infos         []*SegmentLoadInfo
	Schema        *schemapb.CollectionSchema
	LoadCondition TriggerCondition
}
  • ReleaseCollection
type ReleaseCollectionRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	NodeID       int64
}
  • ReleasePartitions
type ReleasePartitionsRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	PartitionIDs []UniqueID
	NodeID       int64
}
  • ReleaseSegments
type ReleaseSegmentsRequest struct {
	Base         *commonpb.MsgBase
	NodeID       int64
	DbID         UniqueID
	CollectionID UniqueID
	PartitionIDs []UniqueID
	SegmentIDs   []UniqueID
}
  • GetSegmentInfo
type GetSegmentInfoRequest struct {
	Base       *commonpb.MsgBase
	SegmentIDs []Unique
}

type GetSegmentInfoResponse struct {
	Status *commonpb.Status
	Infos  []*SegmentInfo
}

//TODO

7.5 Collection Replica

collectionReplica contains an in-memory local copy of persistent collections. In common cases, the system has multiple query nodes. Data of a collection will be distributed across all the available query nodes, and each query node's collectionReplica will maintain its own share (only part of the collection). Every replica tracks a value called tSafe which is the maximum timestamp that the replica is up-to-date.

  • Collection
type collectionReplica struct {
	tSafes map[UniqueID]tSafer // map[collectionID]tSafer

	mu          sync.RWMutex // guards all
	collections map[UniqueID]*Collection
	partitions  map[UniqueID]*Partition
	segments    map[UniqueID]*Segment

	excludedSegments map[UniqueID][]*datapb.SegmentInfo // map[collectionID]segmentIDs
}
  • Collection
type FieldSchema struct {
	FieldID      int64
	Name         string
	IsPrimaryKey bool
	Description  string
	DataType     DataType
	TypeParams   []*commonpb.KeyValuePair
	IndexParams  []*commonpb.KeyValuePair
}

type CollectionSchema struct {
	Name        string
	Description string
	AutoID      bool
	Fields      []*FieldSchema
}

type Collection struct {
	collectionPtr C.CCollection
	id            UniqueID
	partitionIDs  []UniqueID
	schema        *schemapb.CollectionSchema
	vChannels     []Channel
	pChannels     []Channel
	loadType      loadType

	releaseMu          sync.RWMutex
	releasedPartitions map[UniqueID]struct{}
	releaseTime        Timestamp
}
  • Partition
type Partition struct {
	collectionID UniqueID
	partitionID  UniqueID
	segmentIDs   []UniqueID
}
  • Segment
type segmentType int32

const (
	segmentTypeInvalid segmentType = iota
	segmentTypeGrowing
	segmentTypeSealed
	segmentTypeIndexing
)
type indexParam = map[string]string

type Segment struct {
	segmentPtr C.CSegmentInterface

	segmentID    UniqueID
	partitionID  UniqueID
	collectionID UniqueID

	onService bool

	vChannelID   Channel
	lastMemSize  int64
	lastRowCount int64

	once             sync.Once // guards enableIndex
	enableIndex      bool

	rmMutex          sync.Mutex // guards recentlyModified
	recentlyModified bool

	typeMu      sync.Mutex // guards builtIndex
	segmentType segmentType

	paramMutex sync.RWMutex // guards index
	indexInfos map[FieldID]*indexInfo

	idBinlogRowSizes []int64

	vectorFieldMutex sync.RWMutex // guards vectorFieldInfos
	vectorFieldInfos map[UniqueID]*VectorFieldInfo

	pkFilter *bloom.BloomFilter //  bloom filter of pk inside a segment
}
  • Data Sync Service
type dataSyncService struct {
	ctx    context.Context

	mu                   sync.Mutex                                   // guards FlowGraphs
	collectionFlowGraphs map[UniqueID]map[Channel]*queryNodeFlowGraph // map[collectionID]flowGraphs
	partitionFlowGraphs  map[UniqueID]map[Channel]*queryNodeFlowGraph // map[partitionID]flowGraphs

	streamingReplica ReplicaInterface
	tSafeReplica     TSafeReplicaInterface
	msFactory        msgstream.Factory
}