2021-12-21 19:19:21 +08:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
2021-04-19 11:12:56 +08:00
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
2021-12-21 19:19:21 +08:00
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
2021-04-19 11:12:56 +08:00
|
|
|
//
|
2021-12-21 19:19:21 +08:00
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2021-04-19 11:12:56 +08:00
|
|
|
|
2021-06-18 21:30:08 +08:00
|
|
|
package rootcoord
|
2021-01-21 10:01:29 +08:00
|
|
|
|
2021-02-02 10:09:10 +08:00
|
|
|
import (
|
2021-05-14 21:26:06 +08:00
|
|
|
"encoding/json"
|
2021-03-05 10:15:27 +08:00
|
|
|
"fmt"
|
|
|
|
|
2022-08-04 11:04:34 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/log"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2022-07-22 10:20:29 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/common"
|
|
|
|
|
|
|
|
"github.com/milvus-io/milvus/internal/metastore/model"
|
|
|
|
|
2021-05-14 21:26:06 +08:00
|
|
|
"github.com/golang/protobuf/proto"
|
2022-03-03 21:57:56 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/mq/msgstream"
|
2021-04-22 14:45:57 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
|
|
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
2021-02-02 10:09:10 +08:00
|
|
|
)
|
2021-01-21 10:01:29 +08:00
|
|
|
|
2022-08-04 11:04:34 +08:00
|
|
|
var logger = log.L().WithOptions(zap.Fields(zap.String("role", typeutil.RootCoordRole)))
|
|
|
|
|
2021-09-23 15:10:00 +08:00
|
|
|
// EqualKeyPairArray check whether 2 KeyValuePairs are equal
|
2021-01-21 10:01:29 +08:00
|
|
|
func EqualKeyPairArray(p1 []*commonpb.KeyValuePair, p2 []*commonpb.KeyValuePair) bool {
|
|
|
|
if len(p1) != len(p2) {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
m1 := make(map[string]string)
|
|
|
|
for _, p := range p1 {
|
|
|
|
m1[p.Key] = p.Value
|
|
|
|
}
|
|
|
|
for _, p := range p2 {
|
|
|
|
val, ok := m1[p.Key]
|
|
|
|
if !ok {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
if val != p.Value {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
2021-02-02 10:09:10 +08:00
|
|
|
|
2021-09-23 15:10:00 +08:00
|
|
|
// GetFieldSchemaByID return field schema by id
|
2022-07-22 10:20:29 +08:00
|
|
|
func GetFieldSchemaByID(coll *model.Collection, fieldID typeutil.UniqueID) (*model.Field, error) {
|
|
|
|
for _, f := range coll.Fields {
|
2021-02-02 10:09:10 +08:00
|
|
|
if f.FieldID == fieldID {
|
|
|
|
return f, nil
|
|
|
|
}
|
|
|
|
}
|
2021-03-05 10:15:27 +08:00
|
|
|
return nil, fmt.Errorf("field id = %d not found", fieldID)
|
2021-02-02 10:09:10 +08:00
|
|
|
}
|
2021-04-27 10:30:55 +08:00
|
|
|
|
2021-09-23 15:10:00 +08:00
|
|
|
// GetFieldSchemaByIndexID return field schema by it's index id
|
2022-07-22 10:20:29 +08:00
|
|
|
func GetFieldSchemaByIndexID(coll *model.Collection, idxID typeutil.UniqueID) (*model.Field, error) {
|
2021-04-27 10:30:55 +08:00
|
|
|
var fieldID typeutil.UniqueID
|
|
|
|
exist := false
|
2022-07-22 10:20:29 +08:00
|
|
|
for _, t := range coll.FieldIDToIndexID {
|
|
|
|
if t.Value == idxID {
|
|
|
|
fieldID = t.Key
|
2021-04-27 10:30:55 +08:00
|
|
|
exist = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !exist {
|
|
|
|
return nil, fmt.Errorf("index id = %d is not attach to any field", idxID)
|
|
|
|
}
|
|
|
|
return GetFieldSchemaByID(coll, fieldID)
|
|
|
|
}
|
2021-05-14 21:26:06 +08:00
|
|
|
|
|
|
|
// EncodeDdOperation serialize DdOperation into string
|
2021-07-06 09:16:03 +08:00
|
|
|
func EncodeDdOperation(m proto.Message, ddType string) (string, error) {
|
2021-09-23 10:37:54 +08:00
|
|
|
mByte, err := proto.Marshal(m)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
2021-05-14 21:26:06 +08:00
|
|
|
ddOp := DdOperation{
|
2021-09-23 10:37:54 +08:00
|
|
|
Body: mByte,
|
2021-07-06 09:16:03 +08:00
|
|
|
Type: ddType,
|
2021-05-14 21:26:06 +08:00
|
|
|
}
|
|
|
|
ddOpByte, err := json.Marshal(ddOp)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
return string(ddOpByte), nil
|
|
|
|
}
|
2021-05-15 18:08:08 +08:00
|
|
|
|
2021-06-02 22:36:41 +08:00
|
|
|
// DecodeDdOperation deserialize string to DdOperation
|
|
|
|
func DecodeDdOperation(str string, ddOp *DdOperation) error {
|
|
|
|
return json.Unmarshal([]byte(str), ddOp)
|
|
|
|
}
|
|
|
|
|
2021-05-25 11:42:23 +08:00
|
|
|
// EncodeMsgPositions serialize []*MsgPosition into string
|
|
|
|
func EncodeMsgPositions(msgPositions []*msgstream.MsgPosition) (string, error) {
|
|
|
|
if len(msgPositions) == 0 {
|
|
|
|
return "", nil
|
|
|
|
}
|
|
|
|
resByte, err := json.Marshal(msgPositions)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
return string(resByte), nil
|
|
|
|
}
|
2021-06-02 22:36:41 +08:00
|
|
|
|
|
|
|
// DecodeMsgPositions deserialize string to []*MsgPosition
|
|
|
|
func DecodeMsgPositions(str string, msgPositions *[]*msgstream.MsgPosition) error {
|
|
|
|
if str == "" || str == "null" {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return json.Unmarshal([]byte(str), msgPositions)
|
|
|
|
}
|
2022-07-22 10:20:29 +08:00
|
|
|
|
|
|
|
func Int64TupleSliceToMap(s []common.Int64Tuple) map[int]common.Int64Tuple {
|
|
|
|
ret := make(map[int]common.Int64Tuple, len(s))
|
|
|
|
for i, e := range s {
|
|
|
|
ret[i] = e
|
|
|
|
}
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
|
|
|
func Int64TupleMapToSlice(s map[int]common.Int64Tuple) []common.Int64Tuple {
|
|
|
|
ret := make([]common.Int64Tuple, 0, len(s))
|
|
|
|
for _, e := range s {
|
|
|
|
ret = append(ret, e)
|
|
|
|
}
|
|
|
|
return ret
|
|
|
|
}
|