milvus/docs/developer_guides/chap07_query_coordinator.md
xige-16 2af1c27811
Rename queryservice to querycoordinator (#5931)
* Rename queryservice to querycoordinator

Signed-off-by: xige-16 <xi.ge@zilliz.com>

* fix param table

Signed-off-by: xige-16 <xi.ge@zilliz.com>

* code format

Signed-off-by: xige-16 <xi.ge@zilliz.com>

* fix unittest

Signed-off-by: xige-16 <xi.ge@zilliz.com>

* service to coordinator

Signed-off-by: xige-16 <xi.ge@zilliz.com>

* code format

Signed-off-by: xige-16 <xi.ge@zilliz.com>

* set querycoord in ci

Signed-off-by: xige-16 <xi.ge@zilliz.com>
2021-06-22 16:44:09 +08:00

9.5 KiB

8. Query Coordinator

8.1 Overview

8.2 Query Coordinator Interface

type QueryCoord interface {
	Component
	TimeTickProvider

	RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error)
	ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error)
	LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error)
	ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error)
	ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error)
	LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error)
	ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error)
	CreateQueryChannel(ctx context.Context) (*querypb.CreateQueryChannelResponse, error)
	GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error)
	GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error)
}
  • MsgBase
type MsgBase struct {
	MsgType MsgType
	MsgID	UniqueID
	Timestamp Timestamp
	SourceID UniqueID
}
  • RegisterNode
type Address struct {
	Ip   string
	port int64
}

type RegisterNodeRequest struct {
	Base    *commonpb.MsgBase
	Address *commonpb.Address
}

type RegisterNodeResponse struct {
	Status     *commonpb.Status
	InitParams *internalpb.InitParams
}
  • ShowCollections
type ShowCollectionRequest struct {
	Base *commonpb.MsgBase
	DbID UniqueID
}

type ShowCollectionResponse struct {
	Status        *commonpb.Status
	CollectionIDs []UniqueID
}
  • LoadCollection
type LoadCollectionRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	schema       *schemapb.CollectionSchema
}
  • ReleaseCollection
type ReleaseCollectionRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
}
  • ShowPartitions
type ShowPartitionRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
}

type ShowPartitionResponse struct {
	Status       *commonpb.Status
	PartitionIDs []UniqueID
}
  • GetPartitionStates
type PartitionState = int

const (
	PartitionState_NotExist        PartitionState = 0
	PartitionState_NotPresent      PartitionState = 1
	PartitionState_OnDisk          PartitionState = 2
	PartitionState_PartialInMemory PartitionState = 3
	PartitionState_InMemory        PartitionState = 4
	PartitionState_PartialInGPU    PartitionState = 5
	PartitionState_InGPU           PartitionState = 6
)

type PartitionStatesRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	PartitionIDs []UniqueID
}

type PartitionStates struct {
	PartitionID UniqueID
	State       PartitionState
}

type PartitionStatesResponse struct {
	Status                *commonpb.Status
	PartitionDescriptions []*PartitionStates
}
  • LoadPartitions
type LoadPartitonRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	PartitionIDs []UniqueID
	Schema       *schemapb.CollectionSchema
}
  • ReleasePartitions
type ReleasePartitionRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	PartitionIDs []UniqueID
}
  • CreateQueryChannel
type CreateQueryChannelResponse struct {
	Status             *commonpb.Status
	RequestChannelName string
	ResultChannelName  string
}
  • GetSegmentInfo *
type GetSegmentInfoRequest struct {
	Base       *commonpb.MsgBase
	SegmentIDs []UniqueID
}

type SegmentInfo struct {
	SegmentID    UniqueID
	CollectionID UniqueID
	PartitionID  UniqueID
	MemSize      UniqueID
	NumRows      UniqueID
	IndexName    string
	IndexID      UniqueID
}

type GetSegmentInfoResponse struct {
	Status *commonpb.Status
	Infos  []*SegmentInfo
}

8.2 Query Channel

  • SearchMsg
type SearchRequest struct {
	Base            *commonpb.MsgBase
	ResultChannelID string
	DbID            int64
	CollectionID    int64
	PartitionIDs    []int64
	Dsl             string
	// serialized `PlaceholderGroup`
	PlaceholderGroup []byte
	Query            *commonpb.Blob
}

type SearchMsg struct {
	BaseMsg
	SearchRequest
}

8.2 Query Node Interface

type QueryNode interface {
	Component
	TimeTickProvider

	AddQueryChannel(ctx context.Context, req *querypb.AddQueryChannelRequest) (*commonpb.Status, error)
	RemoveQueryChannel(ctx context.Context, req *querypb.RemoveQueryChannelRequest) (*commonpb.Status, error)
	WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error)
	LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error)
	ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error)
	ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error)
	ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error)
	GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error)
}
  • AddQueryChannel
type AddQueryChannelRequest struct {
	Base             *commonpb.MsgBase
	RequestChannelID string
	ResultChannelID  string
}
  • RemoveQueryChannel
type RemoveQueryChannelRequest struct {
	Status           *commonpb.Status
	Base             *commonpb.MsgBase
	RequestChannelID string
	ResultChannelID  string
}
  • WatchDmChannels
type WatchDmChannelInfo struct {
	ChannelID        string
	Pos              *internalpb.MsgPosition
	ExcludedSegments []int64
}

type WatchDmChannelsRequest struct {
	Base         *commonpb.MsgBase
	CollectionID int64
	ChannelIDs   []string
	Infos        []*WatchDmChannelsInfo
}
  • LoadSegments
type LoadSegmentsRequest struct {
	Base          *commonpb.MsgBase
	DbID          UniqueID
	CollectionID  UniqueID
	PartitionID   UniqueID
	SegmentIDs    []UniqueID
	FieldIDs      []UniqueID
	SegmentStates []*datapb.SegmentStateInfo
	Schema        *schemapb.CollectionSchema
}
  • ReleaseCollection
type ReleaseCollectionRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
}
  • ReleasePartitions
type ReleasePartitionsRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	PartitionIDs []UniqueID
}
  • ReleaseSegments
type ReleaseSegmentsRequest struct {
	Base         *commonpb.MsgBase
	DbID         UniqueID
	CollectionID UniqueID
	PartitionIDs []UniqueID
	SegmentIDs   []UniqueID
}
  • GetSegmentInfo
type GetSegmentInfoRequest struct {
	Base       *commonpb.MsgBase
	SegmentIDs []Unique
}

type GetSegmentInfoResponse struct {
	Status *commonpb.Status
	Infos  []*SegmentInfo

}

//TODO

8.2 Collection Replica

collectionReplica contains a in-memory local copy of persistent collections. In common cases, the system has multiple query nodes. Data of a collection will be distributed across all the available query nodes, and each query node's collectionReplica will maintain its own share (only part of the collection). Every replica tracks a value called tSafe which is the maximum timestamp that the replica is up-to-date.

8.1.1 Collection
type collectionReplica struct {
	tSafes map[UniqueID]tSafer // map[collectionID]tSafer

	mu          sync.RWMutex // guards all
	collections map[UniqueID]*Collection
	partitions  map[UniqueID]*Partition
	segments    map[UniqueID]*Segment

	excludedSegments map[UniqueID][]UniqueID // map[collectionID]segmentIDs
}
8.1.2 Collection
type FieldSchema struct {
	FieldID      int64
	Name         string
	IsPrimaryKey bool
	Description  string
	DataType     DataType
	TypeParams   []*commonpb.KeyValuePair
	IndexParams  []*commonpb.KeyValuePair
}

type CollectionSchema struct {
	Name        string
	Description string
	AutoID      bool
	Fields      []*FieldSchema
}

type Collection struct {
	collectionPtr C.CCollection
	id            UniqueID
	partitionIDs  []UniqueID
	schema        *schemapb.CollectionSchema
}
8.1.3 Partition
type Partition struct {
	collectionID UniqueID
	partitionID  UniqueID
	segmentIDs   []UniqueID
	enable       bool
}
8.1.3 Segment
type segmentType int32

const (
	segmentTypeInvalid segmentType = iota
	segmentTypeGrowing
	segmentTypeSealed
	segmentTypeIndexing
)
type indexParam = map[string]string

type Segment struct {
	segmentPtr C.CSegmentInterface

	segmentID    UniqueID
	partitionID  UniqueID
	collectionID UniqueID
	lastMemSize  int64
	lastRowCount int64

	once             sync.Once // guards enableIndex
	enableIndex      bool
	enableLoadBinLog bool

	rmMutex          sync.Mutex // guards recentlyModified
	recentlyModified bool

	typeMu      sync.Mutex // guards builtIndex
	segmentType segmentType

	paramMutex sync.RWMutex // guards index
	indexParam map[int64]indexParam
	indexName  string
	indexID    UniqueID
}

8.3 Data Sync Service

type dataSyncService struct {
	ctx    context.Context
	cancel context.CancelFunc

	collectionID UniqueID
	fg           *flowgraph.TimeTickedFlowGraph

	dmStream  msgstream.MsgStream
	msFactory msgstream.Factory

	replica ReplicaInterface
}