// Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. package indexnode import ( "context" "errors" "runtime" "strconv" "time" "github.com/milvus-io/milvus/internal/util/retry" "github.com/golang/protobuf/proto" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/funcutil" ) const ( paramsKeyToParse = "params" IndexBuildTaskName = "IndexBuildTask" ) type task interface { Ctx() context.Context ID() UniqueID // return ReqID Name() string SetID(uid UniqueID) // set ReqID PreExecute(ctx context.Context) error Execute(ctx context.Context) error PostExecute(ctx context.Context) error WaitToFinish() error Notify(err error) OnEnqueue() error SetError(err error) } type BaseTask struct { done chan error ctx context.Context id UniqueID err error } func (bt *BaseTask) SetError(err error) { bt.err = err } func (bt *BaseTask) ID() UniqueID { return bt.id } func (bt *BaseTask) setID(id UniqueID) { bt.id = id } func (bt *BaseTask) WaitToFinish() error { select { case <-bt.ctx.Done(): return errors.New("timeout") case err := <-bt.done: return err } } func (bt *BaseTask) Notify(err error) { bt.done <- err } type IndexBuildTask struct { BaseTask index Index kv kv.BaseKV etcdKV *etcdkv.EtcdKV savePaths []string req *indexpb.CreateIndexRequest nodeID UniqueID } func (it *IndexBuildTask) Ctx() context.Context { return it.ctx } func (it *IndexBuildTask) ID() UniqueID { return it.id } func (it *IndexBuildTask) SetID(ID UniqueID) { it.BaseTask.setID(ID) } func (bt *BaseTask) Name() string { return IndexBuildTaskName } func (it *IndexBuildTask) OnEnqueue() error { it.SetID(it.req.IndexBuildID) log.Debug("IndexNode IndexBuilderTask Enqueue", zap.Int64("TaskID", it.ID())) return nil } func (it *IndexBuildTask) checkIndexMeta(pre bool) error { fn := func() error { indexMeta := indexpb.IndexMeta{} _, values, versions, err := it.etcdKV.LoadWithPrefix2(it.req.MetaPath) if err != nil { log.Debug("IndexNode checkIndexMeta", zap.Any("load meta error with path", it.req.MetaPath), zap.Error(err), zap.Any("pre", pre)) return err } log.Debug("IndexNode checkIndexMeta load meta success", zap.Any("path", it.req.MetaPath), zap.Any("pre", pre)) err = proto.UnmarshalText(values[0], &indexMeta) if err != nil { log.Debug("IndexNode checkIndexMeta Unmarshal", zap.Error(err)) return err } log.Debug("IndexNode checkIndexMeta Unmarshal success", zap.Any("IndexMeta", indexMeta)) if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished { log.Debug("IndexNode checkIndexMeta Notify build index this version is not the latest version", zap.Any("version", it.req.Version)) return nil } if indexMeta.MarkDeleted { indexMeta.State = commonpb.IndexState_Finished v := proto.MarshalTextString(&indexMeta) err := it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], v) if err != nil { return err } return nil } if pre { return nil } indexMeta.IndexFilePaths = it.savePaths indexMeta.State = commonpb.IndexState_Finished if it.err != nil { indexMeta.State = commonpb.IndexState_Failed } err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], proto.MarshalTextString(&indexMeta)) log.Debug("IndexNode checkIndexMeta CompareVersionAndSwap", zap.Error(err)) return err } err := retry.Retry(3, time.Millisecond*200, fn) log.Debug("IndexNode checkIndexMeta final", zap.Error(err)) return err } func (it *IndexBuildTask) PreExecute(ctx context.Context) error { log.Debug("IndexNode IndexBuildTask preExecute...") return it.checkIndexMeta(true) } func (it *IndexBuildTask) PostExecute(ctx context.Context) error { log.Debug("IndexNode IndexBuildTask PostExecute...") return it.checkIndexMeta(false) } func (it *IndexBuildTask) Execute(ctx context.Context) error { log.Debug("IndexNode IndexBuildTask Execute ...") var err error typeParams := make(map[string]string) for _, kvPair := range it.req.GetTypeParams() { key, value := kvPair.GetKey(), kvPair.GetValue() _, ok := typeParams[key] if ok { return errors.New("duplicated key in type params") } if key == paramsKeyToParse { params, err := funcutil.ParseIndexParamsMap(value) if err != nil { return err } for pk, pv := range params { typeParams[pk] = pv } } else { typeParams[key] = value } } indexParams := make(map[string]string) for _, kvPair := range it.req.GetIndexParams() { key, value := kvPair.GetKey(), kvPair.GetValue() _, ok := indexParams[key] if ok { return errors.New("duplicated key in index params") } if key == paramsKeyToParse { params, err := funcutil.ParseIndexParamsMap(value) if err != nil { return err } for pk, pv := range params { indexParams[pk] = pv } } else { indexParams[key] = value } } it.index, err = NewCIndex(typeParams, indexParams) if err != nil { log.Error("IndexNode IndexBuildTask Execute NewCIndex failed", zap.Error(err)) return err } defer func() { err = it.index.Delete() if err != nil { log.Warn("IndexNode IndexBuildTask Execute CIndexDelete Failed", zap.Error(err)) } }() getKeyByPathNaive := func(path string) string { // splitElements := strings.Split(path, "/") // return splitElements[len(splitElements)-1] return path } getValueByPath := func(path string) ([]byte, error) { data, err := it.kv.Load(path) if err != nil { return nil, err } return []byte(data), nil } getBlobByPath := func(path string) (*Blob, error) { value, err := getValueByPath(path) if err != nil { return nil, err } return &Blob{ Key: getKeyByPathNaive(path), Value: value, }, nil } getStorageBlobs := func(blobs []*Blob) []*storage.Blob { return blobs } toLoadDataPaths := it.req.GetDataPaths() keys := make([]string, len(toLoadDataPaths)) blobs := make([]*Blob, len(toLoadDataPaths)) loadKey := func(idx int) error { keys[idx] = getKeyByPathNaive(toLoadDataPaths[idx]) blob, err := getBlobByPath(toLoadDataPaths[idx]) if err != nil { return err } blobs[idx] = blob return nil } err = funcutil.ProcessFuncParallel(len(toLoadDataPaths), runtime.NumCPU(), loadKey, "loadKey") if err != nil { return err } storageBlobs := getStorageBlobs(blobs) var insertCodec storage.InsertCodec defer insertCodec.Close() partitionID, segmentID, insertData, err2 := insertCodec.Deserialize(storageBlobs) if err2 != nil { return err2 } if len(insertData.Data) != 1 { return errors.New("we expect only one field in deserialized insert data") } for _, value := range insertData.Data { // TODO: BinaryVectorFieldData floatVectorFieldData, fOk := value.(*storage.FloatVectorFieldData) if fOk { err = it.index.BuildFloatVecIndexWithoutIds(floatVectorFieldData.Data) if err != nil { log.Error("IndexNode BuildFloatVecIndexWithoutIds failed", zap.Error(err)) return err } } binaryVectorFieldData, bOk := value.(*storage.BinaryVectorFieldData) if bOk { err = it.index.BuildBinaryVecIndexWithoutIds(binaryVectorFieldData.Data) if err != nil { log.Error("IndexNode BuildBinaryVecIndexWithoutIds failed", zap.Error(err)) return err } } if !fOk && !bOk { return errors.New("we expect FloatVectorFieldData or BinaryVectorFieldData") } indexBlobs, err := it.index.Serialize() if err != nil { log.Error("IndexNode index Serialize failed", zap.Error(err)) return err } var indexCodec storage.IndexCodec serializedIndexBlobs, err := indexCodec.Serialize(getStorageBlobs(indexBlobs), indexParams, it.req.IndexName, it.req.IndexID) if err != nil { return err } getSavePathByKey := func(key string) string { // TODO: fix me, use more reasonable method return strconv.Itoa(int(it.req.IndexBuildID)) + "/" + strconv.Itoa(int(it.req.Version)) + "/" + strconv.Itoa(int(partitionID)) + "/" + strconv.Itoa(int(segmentID)) + "/" + key } saveBlob := func(path string, value []byte) error { return it.kv.Save(path, string(value)) } it.savePaths = make([]string, len(serializedIndexBlobs)) saveIndexFile := func(idx int) error { blob := serializedIndexBlobs[idx] key, value := blob.Key, blob.Value savePath := getSavePathByKey(key) saveIndexFileFn := func() error { v, err := it.etcdKV.Load(it.req.MetaPath) if err != nil { log.Debug("IndexNode load meta failed", zap.Any("path", it.req.MetaPath), zap.Error(err)) return err } indexMeta := indexpb.IndexMeta{} err = proto.UnmarshalText(v, &indexMeta) if err != nil { log.Debug("IndexNode Unmarshal indexMeta error ", zap.Error(err)) return err } log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta)) if indexMeta.Version > it.req.Version { log.Debug("IndexNode try saveIndexFile failed req.Version is low", zap.Any("req.Version", it.req.Version), zap.Any("indexMeta.Version", indexMeta.Version)) return errors.New("This task has been reassigned ") } return saveBlob(savePath, value) } err := retry.Retry(5, time.Millisecond*200, saveIndexFileFn) log.Debug("IndexNode try saveIndexFile final", zap.Error(err), zap.Any("savePath", savePath)) if err != nil { return err } it.savePaths[idx] = savePath return nil } err = funcutil.ProcessFuncParallel(len(serializedIndexBlobs), runtime.NumCPU(), saveIndexFile, "saveIndexFile") if err != nil { return err } } // err = it.index.Delete() // if err != nil { // log.Print("CIndexDelete Failed") // } return nil }