Replace MarshalTextString with Marshal in rootcoord (#8341)

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
Cai Yudong 2021-09-23 10:37:54 +08:00 committed by GitHub
parent 81775dff3e
commit a62bce360b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 82 additions and 70 deletions

View File

@ -99,9 +99,9 @@ func (mt *metaTable) reloadFromKV() error {
for _, value := range values {
tenantMeta := pb.TenantMeta{}
err := proto.UnmarshalText(value, &tenantMeta)
err := proto.Unmarshal([]byte(value), &tenantMeta)
if err != nil {
return fmt.Errorf("RootCoord UnmarshalText pb.TenantMeta err:%w", err)
return fmt.Errorf("RootCoord Unmarshal pb.TenantMeta err:%w", err)
}
mt.tenantID2Meta[tenantMeta.ID] = tenantMeta
}
@ -113,9 +113,9 @@ func (mt *metaTable) reloadFromKV() error {
for _, value := range values {
proxyMeta := pb.ProxyMeta{}
err = proto.UnmarshalText(value, &proxyMeta)
err = proto.Unmarshal([]byte(value), &proxyMeta)
if err != nil {
return fmt.Errorf("RootCoord UnmarshalText pb.ProxyMeta err:%w", err)
return fmt.Errorf("RootCoord Unmarshal pb.ProxyMeta err:%w", err)
}
mt.proxyID2Meta[proxyMeta.ID] = proxyMeta
}
@ -127,9 +127,9 @@ func (mt *metaTable) reloadFromKV() error {
for _, value := range values {
collInfo := pb.CollectionInfo{}
err = proto.UnmarshalText(value, &collInfo)
err = proto.Unmarshal([]byte(value), &collInfo)
if err != nil {
return fmt.Errorf("RootCoord UnmarshalText pb.CollectionInfo err:%w", err)
return fmt.Errorf("RootCoord Unmarshal pb.CollectionInfo err:%w", err)
}
mt.collID2Meta[collInfo.ID] = collInfo
mt.collName2ID[collInfo.Schema.Name] = collInfo.ID
@ -141,9 +141,9 @@ func (mt *metaTable) reloadFromKV() error {
}
for _, value := range values {
segmentIndexInfo := pb.SegmentIndexInfo{}
err = proto.UnmarshalText(value, &segmentIndexInfo)
err = proto.Unmarshal([]byte(value), &segmentIndexInfo)
if err != nil {
return fmt.Errorf("RootCoord UnmarshalText pb.SegmentIndexInfo err:%w", err)
return fmt.Errorf("RootCoord Unmarshal pb.SegmentIndexInfo err:%w", err)
}
// update partID2SegID
@ -173,9 +173,9 @@ func (mt *metaTable) reloadFromKV() error {
}
for _, value := range values {
meta := pb.IndexInfo{}
err = proto.UnmarshalText(value, &meta)
err = proto.Unmarshal([]byte(value), &meta)
if err != nil {
return fmt.Errorf("RootCoord UnmarshalText pb.IndexInfo err:%w", err)
return fmt.Errorf("RootCoord Unmarshal pb.IndexInfo err:%w", err)
}
mt.indexID2Meta[meta.IndexID] = meta
}
@ -186,9 +186,9 @@ func (mt *metaTable) reloadFromKV() error {
}
for _, value := range values {
aliasInfo := pb.CollectionInfo{}
err = proto.UnmarshalText(value, &aliasInfo)
err = proto.Unmarshal([]byte(value), &aliasInfo)
if err != nil {
return fmt.Errorf("RootCoord UnmarshalText pb.AliasInfo err:%w", err)
return fmt.Errorf("RootCoord Unmarshal pb.AliasInfo err:%w", err)
}
mt.collAlias2ID[aliasInfo.Schema.Name] = aliasInfo.ID
}
@ -216,9 +216,9 @@ func (mt *metaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
defer mt.tenantLock.Unlock()
k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID)
v := proto.MarshalTextString(te)
v, _ := proto.Marshal(te)
err := mt.client.Save(k, v, ts)
err := mt.client.Save(k, string(v), ts)
if err != nil {
log.Error("SnapShotKV Save fail", zap.Error(err))
panic("SnapShotKV Save fail")
@ -232,9 +232,9 @@ func (mt *metaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error {
defer mt.proxyLock.Unlock()
k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID)
v := proto.MarshalTextString(po)
v, _ := proto.Marshal(po)
err := mt.client.Save(k, v, ts)
err := mt.client.Save(k, string(v), ts)
if err != nil {
log.Error("SnapShotKV Save fail", zap.Error(err))
panic("SnapShotKV Save fail")
@ -267,8 +267,8 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam
for _, i := range idx {
k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID)
v := proto.MarshalTextString(i)
meta[k] = v
v, _ := proto.Marshal(i)
meta[k] = string(v)
}
// save ddOpStr into etcd
@ -281,9 +281,9 @@ func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestam
mt.collID2Meta[coll.ID] = *coll
mt.collName2ID[coll.Schema.Name] = coll.ID
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
v1 := proto.MarshalTextString(coll)
meta[k1] = v1
return k1, v1, nil
v1, _ := proto.Marshal(coll)
meta[k1] = string(v1)
return k1, string(v1), nil
}
err := mt.client.MultiSave(meta, ts, addition, saveColl)
@ -392,7 +392,7 @@ func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeut
return nil, err
}
colMeta := pb.CollectionInfo{}
err = proto.UnmarshalText(val, &colMeta)
err = proto.Unmarshal([]byte(val), &colMeta)
if err != nil {
return nil, err
}
@ -423,7 +423,7 @@ func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Time
}
for _, val := range vals {
collMeta := pb.CollectionInfo{}
err = proto.UnmarshalText(val, &collMeta)
err = proto.Unmarshal([]byte(val), &collMeta)
if err != nil {
log.Debug("unmarshal collection info failed", zap.Error(err))
continue
@ -455,7 +455,7 @@ func (mt *metaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.Coll
}
for _, val := range vals {
collMeta := pb.CollectionInfo{}
err := proto.UnmarshalText(val, &collMeta)
err := proto.Unmarshal([]byte(val), &collMeta)
if err != nil {
log.Debug("unmarshal collection info failed", zap.Error(err))
}
@ -546,10 +546,10 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string
mt.collID2Meta[collID] = coll
k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
v1 := proto.MarshalTextString(&coll)
meta[k1] = v1
v1, _ := proto.Marshal(&coll)
meta[k1] = string(v1)
return k1, v1, nil
return k1, string(v1), nil
}
err := mt.client.MultiSave(meta, ts, addition, saveColl)
@ -581,7 +581,7 @@ func (mt *metaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID,
return "", err
}
collMeta := pb.CollectionInfo{}
err = proto.UnmarshalText(collVal, &collMeta)
err = proto.Unmarshal([]byte(collVal), &collMeta)
if err != nil {
return "", err
}
@ -612,7 +612,7 @@ func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName
return 0, err
}
collMeta := pb.CollectionInfo{}
err = proto.UnmarshalText(collVal, &collMeta)
err = proto.Unmarshal([]byte(collVal), &collMeta)
if err != nil {
return 0, err
}
@ -683,7 +683,9 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str
}
delete(mt.partID2SegID, partID)
meta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)}
k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10))
v, _ := proto.Marshal(&collMeta)
meta := map[string]string{k: string(v)}
delMetaKeys := []string{}
for _, idxInfo := range collMeta.FieldIndexes {
k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partID)
@ -744,9 +746,9 @@ func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Times
mt.partID2SegID[segIdxInfo.PartitionID][segIdxInfo.SegmentID] = true
k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, segIdxInfo.CollectionID, segIdxInfo.IndexID, segIdxInfo.PartitionID, segIdxInfo.SegmentID)
v := proto.MarshalTextString(segIdxInfo)
v, _ := proto.Marshal(segIdxInfo)
err := mt.client.Save(k, v, ts)
err := mt.client.Save(k, string(v), ts)
if err != nil {
log.Error("SnapShotKV Save fail", zap.Error(err))
panic("SnapShotKV Save fail")
@ -799,7 +801,9 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string, ts typeuti
}
collMeta.FieldIndexes = fieldIdxInfo
mt.collID2Meta[collID] = collMeta
saveMeta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)}
k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10))
v, _ := proto.Marshal(&collMeta)
saveMeta := map[string]string{k: string(v)}
delete(mt.indexID2Meta, dropIdxID)
@ -977,20 +981,20 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
collMeta.FieldIndexes = append(collMeta.FieldIndexes, idx)
mt.collID2Meta[collMeta.ID] = collMeta
k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(collMeta.ID, 10))
v1 := proto.MarshalTextString(&collMeta)
v1, _ := proto.Marshal(&collMeta)
mt.indexID2Meta[idx.IndexID] = *idxInfo
k2 := path.Join(IndexMetaPrefix, strconv.FormatInt(idx.IndexID, 10))
v2 := proto.MarshalTextString(idxInfo)
meta := map[string]string{k1: v1, k2: v2}
v2, _ := proto.Marshal(idxInfo)
meta := map[string]string{k1: string(v1), k2: string(v2)}
if dupIdx != 0 {
dupInfo := mt.indexID2Meta[dupIdx]
dupInfo.IndexName = dupInfo.IndexName + "_bak"
mt.indexID2Meta[dupIdx] = dupInfo
k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
v := proto.MarshalTextString(&dupInfo)
meta[k] = v
v, _ := proto.Marshal(&dupInfo)
meta[k] = string(v)
}
err = mt.client.MultiSave(meta, ts)
if err != nil {
@ -1003,15 +1007,15 @@ func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, id
existInfo.IndexName = idxInfo.IndexName
mt.indexID2Meta[existInfo.IndexID] = existInfo
k := path.Join(IndexMetaPrefix, strconv.FormatInt(existInfo.IndexID, 10))
v := proto.MarshalTextString(&existInfo)
meta := map[string]string{k: v}
v, _ := proto.Marshal(&existInfo)
meta := map[string]string{k: string(v)}
if dupIdx != 0 {
dupInfo := mt.indexID2Meta[dupIdx]
dupInfo.IndexName = dupInfo.IndexName + "_bak"
mt.indexID2Meta[dupIdx] = dupInfo
k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
v := proto.MarshalTextString(&dupInfo)
meta[k] = v
v, _ := proto.Marshal(&dupInfo)
meta[k] = string(v)
}
err = mt.client.MultiSave(meta, ts)
@ -1116,9 +1120,9 @@ func (mt *metaTable) AddAlias(collectionAlias string, collectionName string,
addition := mt.getAdditionKV(ddOpStr, meta)
saveAlias := func(ts typeutil.Timestamp) (string, string, error) {
k1 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
v1 := proto.MarshalTextString(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
meta[k1] = v1
return k1, v1, nil
v1, _ := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
meta[k1] = string(v1)
return k1, string(v1), nil
}
err := mt.client.MultiSave(meta, ts, addition, saveAlias)
@ -1166,9 +1170,9 @@ func (mt *metaTable) AlterAlias(collectionAlias string, collectionName string, t
addition := mt.getAdditionKV(ddOpStr, meta)
alterAlias := func(ts typeutil.Timestamp) (string, string, error) {
k1 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
v1 := proto.MarshalTextString(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
meta[k1] = v1
return k1, v1, nil
v1, _ := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
meta[k1] = string(v1)
return k1, string(v1), nil
}
err := mt.client.MultiSave(meta, ts, addition, alterAlias)

View File

@ -68,53 +68,58 @@ func Test_MockKV(t *testing.T) {
assert.NotNil(t, err)
assert.EqualError(t, err, "load prefix error")
// tenant
prefix[TenantMetaPrefix] = []string{"tenant-prefix"}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
assert.EqualError(t, err, "RootCoord UnmarshalText pb.TenantMeta err:line 1.0: unknown field name \"tenant-prefix\" in milvus.proto.etcd.TenantMeta")
prefix[TenantMetaPrefix] = []string{proto.MarshalTextString(&pb.TenantMeta{})}
value, _ := proto.Marshal(&pb.TenantMeta{})
prefix[TenantMetaPrefix] = []string{string(value)}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
// proxy
prefix[ProxyMetaPrefix] = []string{"porxy-meta"}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
assert.EqualError(t, err, "RootCoord UnmarshalText pb.ProxyMeta err:line 1.0: unknown field name \"porxy-meta\" in milvus.proto.etcd.ProxyMeta")
prefix[ProxyMetaPrefix] = []string{proto.MarshalTextString(&pb.ProxyMeta{})}
value, _ = proto.Marshal(&pb.ProxyMeta{})
prefix[ProxyMetaPrefix] = []string{string(value)}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
// collection
prefix[CollectionMetaPrefix] = []string{"collection-meta"}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
assert.EqualError(t, err, "RootCoord UnmarshalText pb.CollectionInfo err:line 1.0: unknown field name \"collection-meta\" in milvus.proto.etcd.CollectionInfo")
prefix[CollectionMetaPrefix] = []string{proto.MarshalTextString(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}})}
value, _ = proto.Marshal(&pb.CollectionInfo{Schema: &schemapb.CollectionSchema{}})
prefix[CollectionMetaPrefix] = []string{string(value)}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
// segment index
prefix[SegmentIndexMetaPrefix] = []string{"segment-index-meta"}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
assert.EqualError(t, err, "RootCoord UnmarshalText pb.SegmentIndexInfo err:line 1.0: unknown field name \"segment-index-meta\" in milvus.proto.etcd.SegmentIndexInfo")
prefix[SegmentIndexMetaPrefix] = []string{proto.MarshalTextString(&pb.SegmentIndexInfo{})}
value, _ = proto.Marshal(&pb.SegmentIndexInfo{})
prefix[SegmentIndexMetaPrefix] = []string{string(value)}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
prefix[SegmentIndexMetaPrefix] = []string{proto.MarshalTextString(&pb.SegmentIndexInfo{}), proto.MarshalTextString(&pb.SegmentIndexInfo{})}
prefix[SegmentIndexMetaPrefix] = []string{string(value), string(value)}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
assert.EqualError(t, err, "load prefix error")
// index
prefix[IndexMetaPrefix] = []string{"index-meta"}
_, err = NewMetaTable(k1)
assert.NotNil(t, err)
assert.EqualError(t, err, "RootCoord UnmarshalText pb.IndexInfo err:line 1.0: unknown field name \"index-meta\" in milvus.proto.etcd.IndexInfo")
prefix[IndexMetaPrefix] = []string{proto.MarshalTextString(&pb.IndexInfo{})}
value, _ = proto.Marshal(&pb.IndexInfo{})
prefix[IndexMetaPrefix] = []string{string(value)}
m1, err := NewMetaTable(k1)
assert.NotNil(t, err)
assert.EqualError(t, err, "load prefix error")

View File

@ -56,7 +56,7 @@ import (
// DdOperation used to save ddMsg into ETCD
type DdOperation struct {
Body string `json:"body"`
Body []byte `json:"body"`
Type string `json:"type"`
}
@ -1006,7 +1006,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
switch ddOp.Type {
case CreateCollectionDDType:
var ddReq = internalpb.CreateCollectionRequest{}
if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil {
if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
return err
}
collInfo, err := c.MetaTable.GetCollectionByName(ddReq.CollectionName, 0)
@ -1019,7 +1019,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
invalidateCache = false
case DropCollectionDDType:
var ddReq = internalpb.DropCollectionRequest{}
if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil {
if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
return err
}
ts = ddReq.Base.Timestamp
@ -1034,7 +1034,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
invalidateCache = true
case CreatePartitionDDType:
var ddReq = internalpb.CreatePartitionRequest{}
if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil {
if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
return err
}
ts = ddReq.Base.Timestamp
@ -1052,7 +1052,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
invalidateCache = true
case DropPartitionDDType:
var ddReq = internalpb.DropPartitionRequest{}
if err = proto.UnmarshalText(ddOp.Body, &ddReq); err != nil {
if err = proto.Unmarshal(ddOp.Body, &ddReq); err != nil {
return err
}
ts = ddReq.Base.Timestamp

View File

@ -730,7 +730,7 @@ func TestRootCoord(t *testing.T) {
assert.Equal(t, CreateCollectionDDType, ddOp.Type)
var ddCollReq = internalpb.CreateCollectionRequest{}
err = proto.UnmarshalText(ddOp.Body, &ddCollReq)
err = proto.Unmarshal(ddOp.Body, &ddCollReq)
assert.Nil(t, err)
assert.Equal(t, createMeta.ID, ddCollReq.CollectionID)
assert.Equal(t, createMeta.PartitionIDs[0], ddCollReq.PartitionID)
@ -900,7 +900,7 @@ func TestRootCoord(t *testing.T) {
assert.Equal(t, CreatePartitionDDType, ddOp.Type)
var ddReq = internalpb.CreatePartitionRequest{}
err = proto.UnmarshalText(ddOp.Body, &ddReq)
err = proto.Unmarshal(ddOp.Body, &ddReq)
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, ddReq.CollectionID)
assert.Equal(t, collMeta.PartitionIDs[1], ddReq.PartitionID)
@ -1235,7 +1235,7 @@ func TestRootCoord(t *testing.T) {
assert.Equal(t, DropPartitionDDType, ddOp.Type)
var ddReq = internalpb.DropPartitionRequest{}
err = proto.UnmarshalText(ddOp.Body, &ddReq)
err = proto.Unmarshal(ddOp.Body, &ddReq)
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, ddReq.CollectionID)
assert.Equal(t, dropPartID, ddReq.PartitionID)
@ -1325,7 +1325,7 @@ func TestRootCoord(t *testing.T) {
assert.Equal(t, DropCollectionDDType, ddOp.Type)
var ddReq = internalpb.DropCollectionRequest{}
err = proto.UnmarshalText(ddOp.Body, &ddReq)
err = proto.Unmarshal(ddOp.Body, &ddReq)
assert.Nil(t, err)
assert.Equal(t, collMeta.ID, ddReq.CollectionID)

View File

@ -72,9 +72,12 @@ func GetFieldSchemaByIndexID(coll *etcdpb.CollectionInfo, idxID typeutil.UniqueI
// EncodeDdOperation serialize DdOperation into string
func EncodeDdOperation(m proto.Message, ddType string) (string, error) {
mStr := proto.MarshalTextString(m)
mByte, err := proto.Marshal(m)
if err != nil {
return "", err
}
ddOp := DdOperation{
Body: mStr,
Body: mByte,
Type: ddType,
}
ddOpByte, err := json.Marshal(ddOp)