Add index params serde to IndexCodec

Signed-off-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
sunby 2021-01-28 16:41:24 +08:00 committed by yefu.chen
parent bcd7368533
commit 2be8cc1c4b
4 changed files with 40 additions and 11 deletions

View File

@ -106,7 +106,6 @@ func CreateServer(ctx context.Context) (*Server, error) {
registerFinishCh: ch,
cluster: newDataNodeCluster(ch),
}
s.state.Store(internalpb2.StateCode_INITIALIZING)
return s, nil
}
@ -115,6 +114,7 @@ func (s *Server) SetMasterClient(masterClient MasterClient) {
}
func (s *Server) Init() error {
s.state.Store(internalpb2.StateCode_INITIALIZING)
return nil
}

View File

@ -247,7 +247,7 @@ func (it *IndexBuildTask) Execute() error {
}
var indexCodec storage.IndexCodec
serializedIndexBlobs, err := indexCodec.Serialize(getStorageBlobs(indexBlobs))
serializedIndexBlobs, err := indexCodec.Serialize(getStorageBlobs(indexBlobs), indexParams)
if err != nil {
return err
}

View File

@ -1,6 +1,7 @@
package storage
import (
"encoding/json"
"fmt"
"sort"
"strconv"
@ -14,8 +15,9 @@ import (
)
const (
Ts = "ts"
DDL = "ddl"
Ts = "ts"
DDL = "ddl"
indexParamsFile = "indexParams"
)
type (
@ -633,10 +635,29 @@ func NewIndexCodec() *IndexCodec {
return &IndexCodec{}
}
func (indexCodec *IndexCodec) Serialize(blobs []*Blob) ([]*Blob, error) {
func (indexCodec *IndexCodec) Serialize(blobs []*Blob, params map[string]string) ([]*Blob, error) {
paramsBytes, err := json.Marshal(params)
if err != nil {
return nil, err
}
blobs = append(blobs, &Blob{Key: indexParamsFile, Value: paramsBytes})
return blobs, nil
}
func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, error) {
return blobs, nil
func (indexCodec *IndexCodec) Deserialize(blobs []*Blob) ([]*Blob, map[string]string, error) {
var params map[string]string
for i := 0; i < len(blobs); i++ {
if blobs[i].Key != indexParamsFile {
continue
}
if err := json.Unmarshal(blobs[i].Value, &params); err != nil {
return nil, nil, err
}
blobs = append(blobs[:i], blobs[i+1:]...)
break
}
if params == nil {
return nil, nil, errors.New("can not find params blob")
}
return blobs, params, nil
}

View File

@ -307,10 +307,18 @@ func TestIndexCodec(t *testing.T) {
[]byte{8, 8, 8, 8, 8, 8, 8, 8, 2, 3, 4, 5, 6, 7},
},
}
blobsInput, err := indexCodec.Serialize(blobs)
indexParams := map[string]string{
"k1": "v1", "k2": "v2",
}
blobsInput, err := indexCodec.Serialize(blobs, indexParams)
assert.Nil(t, err)
assert.Equal(t, blobs, blobsInput)
blobsOutput, err := indexCodec.Deserialize(blobs)
assert.EqualValues(t, 4, len(blobsInput))
assert.EqualValues(t, indexParamsFile, blobsInput[3])
blobsOutput, indexParamsOutput, err := indexCodec.Deserialize(blobsInput)
assert.Nil(t, err)
assert.Equal(t, blobsOutput, blobsInput)
assert.EqualValues(t, 3, len(blobsOutput))
for i := 0; i < 3; i++ {
assert.EqualValues(t, blobs[i], blobsOutput[i])
}
assert.EqualValues(t, indexParams, indexParamsOutput)
}