From 5bcabffdafe6e433844eaade12357fd477d6b1d5 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 16 Nov 2020 21:10:43 +0800 Subject: [PATCH] Replace conf struct with ParamsTable Signed-off-by: cai.zhang --- cmd/master/main.go | 33 +--- cmd/proxy/proxy.go | 13 +- cmd/reader/reader.go | 12 +- cmd/storage/benchmark.go | 4 +- configs/config.yaml | 2 +- internal/allocator/id_allocator.go | 8 +- internal/allocator/timestamp_allocator.go | 13 +- internal/kv/etcd_kv.go | 31 +++- internal/kv/etcd_kv_test.go | 163 ++++++++---------- internal/{util/kvutil => kv}/kv.go | 2 +- internal/{util/kvutil => kv}/mem_kv.go | 2 +- internal/kv/mockkv/mock_etcd.go | 10 +- internal/master/collection/collection.go | 3 +- internal/master/collection_task.go | 3 +- internal/master/collection_task_test.go | 14 +- internal/master/controller/collection.go | 8 +- internal/master/controller/segment.go | 16 +- internal/master/controller/segment_test.go | 14 +- internal/master/grpc_service_test.go | 12 +- internal/master/id/id.go | 6 +- internal/master/id/id_test.go | 14 +- internal/master/informer/pulsar.go | 15 +- internal/master/master.go | 27 +-- internal/master/meta_table_test.go | 42 ++--- internal/master/paramtable/paramtable.go | 45 +++++ internal/master/partition_task_test.go | 11 +- internal/master/tso/global_allocator.go | 6 +- internal/master/tso/global_allocator_test.go | 16 +- internal/master/tso/tso.go | 4 +- internal/proxy/paramtable.go | 15 ++ internal/proxy/proxy.go | 22 ++- internal/proxy/proxy_test.go | 25 ++- internal/proxy/timetick.go | 16 +- internal/proxy/timetick_test.go | 7 +- internal/reader/meta_service.go | 40 ++--- internal/reader/meta_service_test.go | 92 ++-------- internal/reader/paramtable.go | 35 ++++ internal/reader/query_node_test.go | 17 +- internal/reader/reader.go | 4 + internal/reader/segment_test.go | 6 +- internal/storage/internal/S3/S3_test.go | 4 +- internal/storage/internal/S3/s3_engine.go | 13 -- internal/storage/internal/S3/s3_store.go | 6 +- .../storage/internal/minio/minio_store.go | 13 +- internal/storage/internal/minio/minio_test.go | 4 +- internal/storage/internal/tikv/tikv_store.go | 20 +-- internal/storage/internal/tikv/tikv_test.go | 3 +- internal/storage/storage.go | 10 +- internal/storage/type/storagetype.go | 6 + internal/util/paramtable/paramtable.go | 138 +++++++++++++++ .../paramtable_test.go | 74 ++++---- internal/util/paramtableutil/kv.go | 20 --- internal/util/paramtableutil/paramtable.go | 66 ------- 53 files changed, 589 insertions(+), 616 deletions(-) rename internal/{util/kvutil => kv}/kv.go (96%) rename internal/{util/kvutil => kv}/mem_kv.go (99%) create mode 100644 internal/master/paramtable/paramtable.go create mode 100644 internal/proxy/paramtable.go create mode 100644 internal/reader/paramtable.go create mode 100644 internal/util/paramtable/paramtable.go rename internal/util/{paramtableutil => paramtable}/paramtable_test.go (57%) delete mode 100644 internal/util/paramtableutil/kv.go delete mode 100644 internal/util/paramtableutil/paramtable.go diff --git a/cmd/master/main.go b/cmd/master/main.go index cfcce4979c..e0814ea920 100644 --- a/cmd/master/main.go +++ b/cmd/master/main.go @@ -2,37 +2,26 @@ package main import ( "context" - "flag" "log" "os" "os/signal" - "strconv" "syscall" "github.com/zilliztech/milvus-distributed/internal/master" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" + masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "go.uber.org/zap" ) func main() { + master.Init() - var yamlFile string - flag.StringVar(&yamlFile, "yaml", "", "yaml file") - flag.Parse() - // flag.Usage() - log.Println("yaml file: ", yamlFile) - - err := gparams.GParams.LoadYaml(yamlFile) - if err != nil { - panic(err) - } // Creates server. ctx, cancel := context.WithCancel(context.Background()) - etcdAddress, _ := gparams.GParams.Load("etcd.address") - etcdPort, _ := gparams.GParams.Load("etcd.port") - etcdAddr := etcdAddress + ":" + etcdPort - etcdRootPath, _ := gparams.GParams.Load("etcd.rootpath") - svr, err := master.CreateServer(ctx, etcdRootPath, etcdRootPath, []string{etcdAddr}) + + etcdAddress, _ := masterParams.Params.EtcdAddress() + etcdRootPath, _ := masterParams.Params.EtcdRootPath() + + svr, err := master.CreateServer(ctx, etcdRootPath, etcdRootPath, []string{etcdAddress}) if err != nil { log.Print("create server failed", zap.Error(err)) } @@ -50,13 +39,7 @@ func main() { cancel() }() - masterPort, _ := gparams.GParams.Load("master.port") - grpcPort, err := strconv.ParseInt(masterPort, 10, 64) - if err != nil { - panic(err) - } - - if err := svr.Run(grpcPort); err != nil { + if err := svr.Run(int64(masterParams.Params.Port())); err != nil { log.Fatal("run server failed", zap.Error(err)) } diff --git a/cmd/proxy/proxy.go b/cmd/proxy/proxy.go index 203cf1760a..9351990a2e 100644 --- a/cmd/proxy/proxy.go +++ b/cmd/proxy/proxy.go @@ -2,28 +2,17 @@ package main import ( "context" - "flag" "log" "os" "os/signal" "syscall" "github.com/zilliztech/milvus-distributed/internal/proxy" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" "go.uber.org/zap" ) func main() { - var yamlFile string - flag.StringVar(&yamlFile, "yaml", "", "yaml file") - flag.Parse() - flag.Usage() - log.Println("yaml file: ", yamlFile) - - err := gparams.GParams.LoadYaml(yamlFile) - if err != nil { - panic(err) - } + proxy.Init() // Creates server. ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/reader/reader.go b/cmd/reader/reader.go index 4d377c50cf..cc8a48a01e 100644 --- a/cmd/reader/reader.go +++ b/cmd/reader/reader.go @@ -7,17 +7,13 @@ import ( "syscall" "github.com/zilliztech/milvus-distributed/internal/reader" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } + reader.Init() sc := make(chan os.Signal, 1) signal.Notify(sc, @@ -31,10 +27,8 @@ func main() { sig = <-sc cancel() }() - pulsarAddr, _ := gparams.GParams.Load("pulsar.address") - pulsarPort, _ := gparams.GParams.Load("pulsar.port") - pulsarAddr += ":" + pulsarPort - reader.StartQueryNode(ctx, pulsarAddr) + pulsarAddress, _ := reader.Params.PulsarAddress() + reader.StartQueryNode(ctx, pulsarAddress) switch sig { case syscall.SIGTERM: diff --git a/cmd/storage/benchmark.go b/cmd/storage/benchmark.go index c965db6872..6a7c06f9a2 100644 --- a/cmd/storage/benchmark.go +++ b/cmd/storage/benchmark.go @@ -188,7 +188,9 @@ func main() { if valueSize, err = bytefmt.ToBytes(sizeArg); err != nil { log.Fatalf("Invalid -z argument for object size: %v", err) } - store, err = storage.NewStore(context.Background(), storeType) + var option = storagetype.Option{TikvAddress: "localhost:2379", Type: storeType, BucketName: "zilliz-hz"} + + store, err = storage.NewStore(context.Background(), option) if err != nil { log.Fatalf("Error when creating storage " + err.Error()) } diff --git a/configs/config.yaml b/configs/config.yaml index 7973e34075..7ba130a2bc 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -14,7 +14,7 @@ master: port: 53100 pulsarmoniterinterval: 1 pulsartopic: "monitor-topic" - segmentthreshole: 1073741824 + segmentthreshold: 1073741824 proxyidlist: [0] querynodenum: 1 writenodenum: 1 diff --git a/internal/allocator/id_allocator.go b/internal/allocator/id_allocator.go index ee214a6d99..e52f7721d6 100644 --- a/internal/allocator/id_allocator.go +++ b/internal/allocator/id_allocator.go @@ -4,11 +4,8 @@ import ( "context" "fmt" "log" - "strconv" "time" - "github.com/zilliztech/milvus-distributed/internal/conf" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -22,10 +19,7 @@ type IDAllocator struct { idEnd UniqueID } -func NewIDAllocator(ctx context.Context) (*IDAllocator, error) { - masterAddr := conf.Config.Etcd.Address - masterAddr += ":" - masterAddr += strconv.FormatInt(int64(conf.Config.Master.Port), 10) +func NewIDAllocator(ctx context.Context, masterAddr string) (*IDAllocator, error) { ctx1, cancel := context.WithCancel(ctx) a := &IDAllocator{ diff --git a/internal/allocator/timestamp_allocator.go b/internal/allocator/timestamp_allocator.go index f37547d6a1..033f085431 100644 --- a/internal/allocator/timestamp_allocator.go +++ b/internal/allocator/timestamp_allocator.go @@ -4,14 +4,10 @@ import ( "context" "fmt" "log" - "strconv" "time" - "github.com/zilliztech/milvus-distributed/internal/conf" - - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) type Timestamp = typeutil.Timestamp @@ -27,12 +23,7 @@ type TimestampAllocator struct { lastTsEnd Timestamp } -func NewTimestampAllocator(ctx context.Context) (*TimestampAllocator, error) { - - masterAddr := conf.Config.Etcd.Address - masterAddr += ":" - masterAddr += strconv.FormatInt(int64(conf.Config.Master.Port), 10) - +func NewTimestampAllocator(ctx context.Context, masterAddr string) (*TimestampAllocator, error) { ctx1, cancel := context.WithCancel(ctx) a := &TimestampAllocator{ Allocator: Allocator{reqs: make(chan request, maxMergeRequests), diff --git a/internal/kv/etcd_kv.go b/internal/kv/etcd_kv.go index 9163f60296..c2da685e84 100644 --- a/internal/kv/etcd_kv.go +++ b/internal/kv/etcd_kv.go @@ -11,7 +11,7 @@ import ( ) const ( - requestTimeout = 10 * time.Second + RequestTimeout = 10 * time.Second ) type EtcdKV struct { @@ -31,10 +31,14 @@ func (kv *EtcdKV) Close() { kv.client.Close() } +func (kv *EtcdKV) GetPath(key string) string { + return path.Join(kv.rootPath, key) +} + func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) { key = path.Join(kv.rootPath, key) log.Printf("LoadWithPrefix %s", key) - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { @@ -51,7 +55,7 @@ func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) { func (kv *EtcdKV) Load(key string) (string, error) { key = path.Join(kv.rootPath, key) - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() resp, err := kv.client.Get(ctx, key) if err != nil { @@ -70,7 +74,7 @@ func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) { ops = append(ops, clientv3.OpGet(path.Join(kv.rootPath, keyLoad))) } - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() resp, err := kv.client.Txn(ctx).If().Then(ops...).Commit() if err != nil { @@ -99,7 +103,7 @@ func (kv *EtcdKV) MultiLoad(keys []string) ([]string, error) { func (kv *EtcdKV) Save(key, value string) error { key = path.Join(kv.rootPath, key) - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() _, err := kv.client.Put(ctx, key, value) return err @@ -111,16 +115,25 @@ func (kv *EtcdKV) MultiSave(kvs map[string]string) error { ops = append(ops, clientv3.OpPut(path.Join(kv.rootPath, key), value)) } - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() return err } +func (kv *EtcdKV) RemoveWithPrefix(prefix string) error { + key := path.Join(kv.rootPath, prefix) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + + _, err := kv.client.Delete(ctx, key, clientv3.WithPrefix()) + return err +} + func (kv *EtcdKV) Remove(key string) error { key = path.Join(kv.rootPath, key) - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() _, err := kv.client.Delete(ctx, key) @@ -133,7 +146,7 @@ func (kv *EtcdKV) MultiRemove(keys []string) error { ops = append(ops, clientv3.OpDelete(path.Join(kv.rootPath, key))) } - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() @@ -151,7 +164,7 @@ func (kv *EtcdKV) MultiSaveAndRemove(saves map[string]string, removals []string) } log.Printf("MultiSaveAndRemove") - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) defer cancel() _, err := kv.client.Txn(ctx).If().Then(ops...).Commit() diff --git a/internal/kv/etcd_kv_test.go b/internal/kv/etcd_kv_test.go index 6afc0882ab..61a2036f6f 100644 --- a/internal/kv/etcd_kv_test.go +++ b/internal/kv/etcd_kv_test.go @@ -1,64 +1,67 @@ -package kv +package kv_test import ( - "context" - "path" + "os" "testing" "github.com/stretchr/testify/assert" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" + "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/util/paramtable" "go.etcd.io/etcd/clientv3" ) +var Params paramtable.BaseTable + +func TestMain(m *testing.M) { + Params.Init() + code := m.Run() + os.Exit(code) +} + func TestEtcdKV_Load(t *testing.T) { - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } - etcdPort, err := gparams.GParams.Load("etcd.port") + + etcdAddr, err := Params.EtcdAddress() if err != nil { panic(err) } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) - rootpath := "/etcd/test/root" - kv := NewEtcdKV(cli, rootpath) - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) - defer cancel() + rootPath := "/etcd/test/root" + etcdKV := kv.NewEtcdKV(cli, rootPath) - defer kv.Close() - defer kv.client.Delete(ctx, rootpath, clientv3.WithPrefix()) + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") - err = kv.Save("abc", "123") + err = etcdKV.Save("abc", "123") assert.Nil(t, err) - err = kv.Save("abcd", "1234") + err = etcdKV.Save("abcd", "1234") assert.Nil(t, err) - val, err := kv.Load("abc") + val, err := etcdKV.Load("abc") assert.Nil(t, err) assert.Equal(t, val, "123") - keys, vals, err := kv.LoadWithPrefix("abc") + keys, vals, err := etcdKV.LoadWithPrefix("abc") assert.Nil(t, err) assert.Equal(t, len(keys), len(vals)) assert.Equal(t, len(keys), 2) - assert.Equal(t, keys[0], path.Join(kv.rootPath, "abc")) - assert.Equal(t, keys[1], path.Join(kv.rootPath, "abcd")) + assert.Equal(t, keys[0], etcdKV.GetPath("abc")) + assert.Equal(t, keys[1], etcdKV.GetPath("abcd")) assert.Equal(t, vals[0], "123") assert.Equal(t, vals[1], "1234") - err = kv.Save("key_1", "123") + err = etcdKV.Save("key_1", "123") assert.Nil(t, err) - err = kv.Save("key_2", "456") + err = etcdKV.Save("key_2", "456") assert.Nil(t, err) - err = kv.Save("key_3", "789") + err = etcdKV.Save("key_3", "789") assert.Nil(t, err) keys = []string{"key_1", "key_100"} - vals, err = kv.MultiLoad(keys) + vals, err = etcdKV.MultiLoad(keys) assert.NotNil(t, err) assert.Equal(t, len(vals), len(keys)) assert.Equal(t, vals[0], "123") @@ -66,7 +69,7 @@ func TestEtcdKV_Load(t *testing.T) { keys = []string{"key_1", "key_2"} - vals, err = kv.MultiLoad(keys) + vals, err = etcdKV.MultiLoad(keys) assert.Nil(t, err) assert.Equal(t, len(vals), len(keys)) assert.Equal(t, vals[0], "123") @@ -74,26 +77,21 @@ func TestEtcdKV_Load(t *testing.T) { } func TestEtcdKV_MultiSave(t *testing.T) { - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } - etcdPort, err := gparams.GParams.Load("etcd.port") + + etcdAddr, err := Params.EtcdAddress() if err != nil { panic(err) } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) - rootpath := "/etcd/test/root" - kv := NewEtcdKV(cli, rootpath) - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) - defer cancel() + rootPath := "/etcd/test/root" + etcdKV := kv.NewEtcdKV(cli, rootPath) - defer kv.Close() - defer kv.client.Delete(ctx, rootpath, clientv3.WithPrefix()) + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") - err = kv.Save("key_1", "111") + err = etcdKV.Save("key_1", "111") assert.Nil(t, err) kvs := map[string]string{ @@ -101,63 +99,58 @@ func TestEtcdKV_MultiSave(t *testing.T) { "key_2": "456", } - err = kv.MultiSave(kvs) + err = etcdKV.MultiSave(kvs) assert.Nil(t, err) - val, err := kv.Load("key_1") + val, err := etcdKV.Load("key_1") assert.Nil(t, err) assert.Equal(t, val, "123") } func TestEtcdKV_Remove(t *testing.T) { - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } - etcdPort, err := gparams.GParams.Load("etcd.port") + + etcdAddr, err := Params.EtcdAddress() if err != nil { panic(err) } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) - rootpath := "/etcd/test/root" - kv := NewEtcdKV(cli, rootpath) - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) - defer cancel() + rootPath := "/etcd/test/root" + etcdKV := kv.NewEtcdKV(cli, rootPath) - defer kv.Close() - defer kv.client.Delete(ctx, rootpath, clientv3.WithPrefix()) + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") - err = kv.Save("key_1", "123") + err = etcdKV.Save("key_1", "123") assert.Nil(t, err) - err = kv.Save("key_2", "456") + err = etcdKV.Save("key_2", "456") assert.Nil(t, err) - val, err := kv.Load("key_1") + val, err := etcdKV.Load("key_1") assert.Nil(t, err) assert.Equal(t, val, "123") // delete "key_1" - err = kv.Remove("key_1") + err = etcdKV.Remove("key_1") assert.Nil(t, err) - val, err = kv.Load("key_1") + val, err = etcdKV.Load("key_1") assert.Error(t, err) assert.Empty(t, val) - val, err = kv.Load("key_2") + val, err = etcdKV.Load("key_2") assert.Nil(t, err) assert.Equal(t, val, "456") - keys, vals, err := kv.LoadWithPrefix("key") + keys, vals, err := etcdKV.LoadWithPrefix("key") assert.Nil(t, err) assert.Equal(t, len(keys), len(vals)) assert.Equal(t, len(keys), 1) - assert.Equal(t, keys[0], path.Join(kv.rootPath, "key_2")) + assert.Equal(t, keys[0], etcdKV.GetPath("key_2")) assert.Equal(t, vals[0], "456") // MultiRemove - err = kv.Save("key_1", "111") + err = etcdKV.Save("key_1", "111") assert.Nil(t, err) kvs := map[string]string{ @@ -167,48 +160,44 @@ func TestEtcdKV_Remove(t *testing.T) { "key_4": "012", } - err = kv.MultiSave(kvs) + err = etcdKV.MultiSave(kvs) assert.Nil(t, err) - val, err = kv.Load("key_1") + val, err = etcdKV.Load("key_1") assert.Nil(t, err) assert.Equal(t, val, "123") - val, err = kv.Load("key_3") + val, err = etcdKV.Load("key_3") assert.Nil(t, err) assert.Equal(t, val, "789") keys = []string{"key_1", "key_2", "key_3"} - err = kv.MultiRemove(keys) + err = etcdKV.MultiRemove(keys) assert.Nil(t, err) - val, err = kv.Load("key_1") + val, err = etcdKV.Load("key_1") assert.Error(t, err) assert.Empty(t, val) } func TestEtcdKV_MultiSaveAndRemove(t *testing.T) { - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } - etcdPort, err := gparams.GParams.Load("etcd.port") + + etcdAddr, err := Params.EtcdAddress() if err != nil { panic(err) } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) - rootpath := "/etcd/test/root" - kv := NewEtcdKV(cli, rootpath) - ctx, cancel := context.WithTimeout(context.TODO(), requestTimeout) - defer cancel() - defer kv.Close() - defer kv.client.Delete(ctx, rootpath, clientv3.WithPrefix()) + rootPath := "/etcd/test/root" + etcdKV := kv.NewEtcdKV(cli, rootPath) - err = kv.Save("key_1", "123") + defer etcdKV.Close() + defer etcdKV.RemoveWithPrefix("") + + err = etcdKV.Save("key_1", "123") assert.Nil(t, err) - err = kv.Save("key_2", "456") + err = etcdKV.Save("key_2", "456") assert.Nil(t, err) - err = kv.Save("key_3", "789") + err = etcdKV.Save("key_3", "789") assert.Nil(t, err) kvs := map[string]string{ @@ -218,15 +207,15 @@ func TestEtcdKV_MultiSaveAndRemove(t *testing.T) { keys := []string{"key_3"} - err = kv.MultiSaveAndRemove(kvs, keys) + err = etcdKV.MultiSaveAndRemove(kvs, keys) assert.Nil(t, err) - val, err := kv.Load("key_1") + val, err := etcdKV.Load("key_1") assert.Nil(t, err) assert.Equal(t, val, "111") - val, err = kv.Load("key_2") + val, err = etcdKV.Load("key_2") assert.Nil(t, err) assert.Equal(t, val, "444") - val, err = kv.Load("key_3") + val, err = etcdKV.Load("key_3") assert.Error(t, err) assert.Empty(t, val) } diff --git a/internal/util/kvutil/kv.go b/internal/kv/kv.go similarity index 96% rename from internal/util/kvutil/kv.go rename to internal/kv/kv.go index f35aa5b62e..ac40309984 100644 --- a/internal/util/kvutil/kv.go +++ b/internal/kv/kv.go @@ -1,4 +1,4 @@ -package kvutil +package kv type Base interface { Load(key string) (string, error) diff --git a/internal/util/kvutil/mem_kv.go b/internal/kv/mem_kv.go similarity index 99% rename from internal/util/kvutil/mem_kv.go rename to internal/kv/mem_kv.go index 60b35613be..0e03beb87f 100644 --- a/internal/util/kvutil/mem_kv.go +++ b/internal/kv/mem_kv.go @@ -1,4 +1,4 @@ -package kvutil +package kv import ( "sync" diff --git a/internal/kv/mockkv/mock_etcd.go b/internal/kv/mockkv/mock_etcd.go index 04a3b95a23..f5af8f9bca 100644 --- a/internal/kv/mockkv/mock_etcd.go +++ b/internal/kv/mockkv/mock_etcd.go @@ -1,14 +1,14 @@ package mockkv import ( - "github.com/zilliztech/milvus-distributed/internal/util/kvutil" + "github.com/zilliztech/milvus-distributed/internal/kv" ) // use MemoryKV to mock EtcdKV -func NewEtcdKV() *kvutil.MemoryKV { - return kvutil.NewMemoryKV() +func NewEtcdKV() *kv.MemoryKV { + return kv.NewMemoryKV() } -func NewMemoryKV() *kvutil.MemoryKV { - return kvutil.NewMemoryKV() +func NewMemoryKV() *kv.MemoryKV { + return kv.NewMemoryKV() } diff --git a/internal/master/collection/collection.go b/internal/master/collection/collection.go index 4a617c26f6..b727ce39fc 100644 --- a/internal/master/collection/collection.go +++ b/internal/master/collection/collection.go @@ -3,12 +3,11 @@ package collection import ( "time" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "github.com/golang/protobuf/proto" jsoniter "github.com/json-iterator/go" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary diff --git a/internal/master/collection_task.go b/internal/master/collection_task.go index 7788f51181..dc63df5ccb 100644 --- a/internal/master/collection_task.go +++ b/internal/master/collection_task.go @@ -5,12 +5,11 @@ import ( "log" "github.com/golang/protobuf/proto" - "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) type Timestamp = typeutil.Timestamp diff --git a/internal/master/collection_task_test.go b/internal/master/collection_task_test.go index 7ffd35e461..955551b9a7 100644 --- a/internal/master/collection_task_test.go +++ b/internal/master/collection_task_test.go @@ -7,29 +7,23 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" + masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" ) func TestMaster_CollectionTask(t *testing.T) { - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } + Init() + ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - etcdPort, err := gparams.GParams.Load("etcd.port") - if err != nil { - panic(err) - } - etcdAddr := "127.0.0.1:" + etcdPort + etcdAddr, _ := masterParams.Params.EtcdAddress() etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) diff --git a/internal/master/controller/collection.go b/internal/master/controller/collection.go index b132ae3e00..2d52cf8ea5 100644 --- a/internal/master/controller/collection.go +++ b/internal/master/controller/collection.go @@ -8,9 +8,9 @@ import ( "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/master/collection" "github.com/zilliztech/milvus-distributed/internal/master/id" + masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/master/segment" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -66,11 +66,7 @@ func WriteCollection2Datastore(collectionMeta *schemapb.CollectionSchema, kvbase time.Now(), fieldMetas, []UniqueID{sID}, []string{"default"}) cm := collection.GrpcMarshal(&c) - pulsarTopicNum, err := gparams.GParams.Load("pulsar.topicnum") - if err != nil { - panic(err) - } - topicNum, err := strconv.Atoi(pulsarTopicNum) + topicNum, err := masterParams.Params.TopicNum() if err != nil { panic(err) } diff --git a/internal/master/controller/segment.go b/internal/master/controller/segment.go index cd137c4a94..be8c686e33 100644 --- a/internal/master/controller/segment.go +++ b/internal/master/controller/segment.go @@ -8,28 +8,22 @@ import ( "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/master/collection" "github.com/zilliztech/milvus-distributed/internal/master/id" + masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/master/segment" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" ) func ComputeCloseTime(ss internalpb.SegmentStats, kvbase *kv.EtcdKV) error { - masterSegmentThreshole, err := gparams.GParams.Load("master.segmentthreshole") - if err != nil { - panic(err) - } - segmentThreshole, err := strconv.ParseFloat(masterSegmentThreshole, 32) - if err != nil { - panic(err) - } - if int(ss.MemorySize) > int(segmentThreshole*0.8) { + masterParams.Params.InitParamTable() + segmentThreshold := masterParams.Params.SegmentThreshold() + if int(ss.MemorySize) > int(segmentThreshold*0.8) { currentTime := time.Now() //memRate := int(ss.MemoryRate) memRate := 1 // to do if memRate == 0 { memRate = 1 } - sec := int(segmentThreshole*0.2) / memRate + sec := int(segmentThreshold*0.2) / memRate data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegmentID))) if err != nil { return err diff --git a/internal/master/controller/segment_test.go b/internal/master/controller/segment_test.go index 79df4c3d1a..0431c7f111 100644 --- a/internal/master/controller/segment_test.go +++ b/internal/master/controller/segment_test.go @@ -5,26 +5,24 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/kv" + masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" "go.etcd.io/etcd/clientv3" ) func newKvBase() *kv.EtcdKV { - err := gparams.GParams.LoadYaml("config.yaml") + masterParams.Params.Init() + + etcdAddr, err := masterParams.Params.EtcdAddress() if err != nil { panic(err) } - etcdPort, err := gparams.GParams.Load("etcd.port") - if err != nil { - panic(err) - } - etcdAddr := "127.0.0.1:" + etcdPort + cli, _ := clientv3.New(clientv3.Config{ Endpoints: []string{etcdAddr}, DialTimeout: 5 * time.Second, }) - etcdRootPath, err := gparams.GParams.Load("etcd.rootpath") + etcdRootPath, err := masterParams.Params.EtcdRootPath() if err != nil { panic(err) } diff --git a/internal/master/grpc_service_test.go b/internal/master/grpc_service_test.go index ff564f8f22..45b1523745 100644 --- a/internal/master/grpc_service_test.go +++ b/internal/master/grpc_service_test.go @@ -6,29 +6,25 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" + masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" ) func TestMaster_CreateCollection(t *testing.T) { - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } + Init() + ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - etcdPort, err := gparams.GParams.Load("etcd.port") + etcdAddr, err := masterParams.Params.EtcdAddress() if err != nil { panic(err) } - etcdAddr := "127.0.0.1:" + etcdPort - etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) diff --git a/internal/master/id/id.go b/internal/master/id/id.go index c068f5b39f..a0d7bda9ed 100644 --- a/internal/master/id/id.go +++ b/internal/master/id/id.go @@ -1,8 +1,8 @@ package id import ( + "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/master/tso" - "github.com/zilliztech/milvus-distributed/internal/util/kvutil" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -20,12 +20,12 @@ func Init(etcdAddr []string, rootPath string) { InitGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(etcdAddr, rootPath, "gid")) } -func InitGlobalIDAllocator(key string, base kvutil.Base) { +func InitGlobalIDAllocator(key string, base kv.Base) { allocator = NewGlobalIDAllocator(key, base) allocator.Initialize() } -func NewGlobalIDAllocator(key string, base kvutil.Base) *GlobalIDAllocator { +func NewGlobalIDAllocator(key string, base kv.Base) *GlobalIDAllocator { return &GlobalIDAllocator{ allocator: tso.NewGlobalTSOAllocator(key, base), } diff --git a/internal/master/id/id_test.go b/internal/master/id/id_test.go index 6e60f684a5..4c2a2f8c33 100644 --- a/internal/master/id/id_test.go +++ b/internal/master/id/id_test.go @@ -6,25 +6,21 @@ import ( "github.com/stretchr/testify/assert" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" + masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" ) var GIdAllocator *GlobalIDAllocator func TestMain(m *testing.M) { - err := gparams.GParams.LoadYaml("config.yaml") + masterParams.Params.Init() + + etcdAddr, err := masterParams.Params.EtcdAddress() if err != nil { panic(err) } - - etcdPort, err := gparams.GParams.Load("etcd.port") - if err != nil { - panic(err) - } - etcdAddr := "127.0.0.1:" + etcdPort - GIdAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "gid")) + exitCode := m.Run() os.Exit(exitCode) } diff --git a/internal/master/informer/pulsar.go b/internal/master/informer/pulsar.go index 7aea6fc54d..8c039569b0 100644 --- a/internal/master/informer/pulsar.go +++ b/internal/master/informer/pulsar.go @@ -5,21 +5,14 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" + masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" ) func NewPulsarClient() *PulsarClient { - pulsarAddr, err := gparams.GParams.Load("pulsar.address") - if err != nil { - panic(err) - } - pulsarPort, err := gparams.GParams.Load("pulsar.port") - if err != nil { - panic(err) - } - pulsarAddr = "pulsar://" + pulsarAddr + ":" + pulsarPort + pulsarAddress, _ := masterParams.Params.PulsarAddress() + pulsarAddress = "pulsar://" + pulsarAddress client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: pulsarAddr, + URL: pulsarAddress, OperationTimeout: 30 * time.Second, ConnectionTimeout: 30 * time.Second, }) diff --git a/internal/master/master.go b/internal/master/master.go index a2a3851059..9fb054a729 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -21,10 +21,10 @@ import ( "github.com/zilliztech/milvus-distributed/internal/master/controller" "github.com/zilliztech/milvus-distributed/internal/master/id" "github.com/zilliztech/milvus-distributed/internal/master/informer" + masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/master/tso" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" ) // Server is the pd server. @@ -72,15 +72,25 @@ func newKVBase(kvRoot string, etcdAddr []string) *kv.EtcdKV { return kvBase } -func Init(etcdAddr []string, rootPath string) { +func Init() { rand.Seed(time.Now().UnixNano()) - id.Init(etcdAddr, rootPath) - tso.Init(etcdAddr, rootPath) + masterParams.Params.InitParamTable() + etcdAddr, err := masterParams.Params.EtcdAddress() + if err != nil { + panic(err) + } + rootPath, err := masterParams.Params.EtcdRootPath() + if err != nil { + panic(err) + } + id.Init([]string{etcdAddr}, rootPath) + tso.Init([]string{etcdAddr}, rootPath) + } // CreateServer creates the UNINITIALIZED pd server with given configuration. func CreateServer(ctx context.Context, kvRootPath, metaRootPath string, etcdAddr []string) (*Master, error) { - Init(etcdAddr, kvRootPath) + //Init(etcdAddr, kvRootPath) etcdClient, err := clientv3.New(clientv3.Config{Endpoints: etcdAddr}) if err != nil { @@ -272,13 +282,8 @@ func (s *Master) pulsarLoop() { ctx, cancel := context.WithCancel(s.serverLoopCtx) - pulsarTopic, err := gparams.GParams.Load("master.pulsartopic") - if err != nil { - panic(err) - } - consumer, err := s.pc.Client.Subscribe(pulsar.ConsumerOptions{ - Topic: pulsarTopic, + Topic: masterParams.Params.PulsarToic(), SubscriptionName: "my-sub", Type: pulsar.Shared, }) diff --git a/internal/master/meta_table_test.go b/internal/master/meta_table_test.go index 6c5572e1a9..cc2e532a1e 100644 --- a/internal/master/meta_table_test.go +++ b/internal/master/meta_table_test.go @@ -7,22 +7,20 @@ import ( "github.com/stretchr/testify/assert" "github.com/zilliztech/milvus-distributed/internal/kv" + masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" "go.etcd.io/etcd/clientv3" ) func TestMetaTable_Collection(t *testing.T) { - err := gparams.GParams.LoadYaml("config.yaml") + Init() + + etcdAddr, err := masterParams.Params.EtcdAddress() if err != nil { panic(err) } - etcdPort, err := gparams.GParams.Load("etcd.port") - if err != nil { - panic(err) - } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root") @@ -151,16 +149,14 @@ func TestMetaTable_Collection(t *testing.T) { } func TestMetaTable_DeletePartition(t *testing.T) { - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } - etcdPort, err := gparams.GParams.Load("etcd.port") + Init() + + etcdAddr, err := masterParams.Params.EtcdAddress() if err != nil { panic(err) } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root") @@ -241,16 +237,14 @@ func TestMetaTable_DeletePartition(t *testing.T) { } func TestMetaTable_Segment(t *testing.T) { - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } - etcdPort, err := gparams.GParams.Load("etcd.port") + Init() + + etcdAddr, err := masterParams.Params.EtcdAddress() if err != nil { panic(err) } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root") @@ -322,16 +316,14 @@ func TestMetaTable_Segment(t *testing.T) { } func TestMetaTable_UpdateSegment(t *testing.T) { - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } - etcdPort, err := gparams.GParams.Load("etcd.port") + Init() + + etcdAddr, err := masterParams.Params.EtcdAddress() if err != nil { panic(err) } - cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:" + etcdPort}}) + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) etcdKV := kv.NewEtcdKV(cli, "/etcd/test/root") diff --git a/internal/master/paramtable/paramtable.go b/internal/master/paramtable/paramtable.go new file mode 100644 index 0000000000..6210e030cf --- /dev/null +++ b/internal/master/paramtable/paramtable.go @@ -0,0 +1,45 @@ +package paramtable + +import ( + "strconv" + + "github.com/zilliztech/milvus-distributed/internal/util/paramtable" +) + +type ParamTable struct { + paramtable.BaseTable +} + +var Params ParamTable + +func (p *ParamTable) InitParamTable() { + p.Init() +} + +func (p *ParamTable) Address() string { + masterAddress, _ := p.Load("master.address") + return masterAddress +} + +func (p *ParamTable) Port() int { + masterPort, _ := p.Load("master.port") + port, err := strconv.Atoi(masterPort) + if err != nil { + panic(err) + } + return port +} + +func (p *ParamTable) PulsarToic() string { + pulsarTopic, _ := p.Load("master.pulsartopic") + return pulsarTopic +} + +func (p *ParamTable) SegmentThreshold() float64 { + threshole, _ := p.Load("master.segmentthreshold") + segmentThreshole, err := strconv.ParseFloat(threshole, 32) + if err != nil { + panic(err) + } + return segmentThreshole +} diff --git a/internal/master/partition_task_test.go b/internal/master/partition_task_test.go index 2b5431479b..ff59c1be51 100644 --- a/internal/master/partition_task_test.go +++ b/internal/master/partition_task_test.go @@ -8,29 +8,26 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" + masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" "go.etcd.io/etcd/clientv3" "google.golang.org/grpc" ) func TestMaster_Partition(t *testing.T) { - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } + Init() + ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - etcdPort, err := gparams.GParams.Load("etcd.port") + etcdAddr, err := masterParams.Params.EtcdAddress() if err != nil { panic(err) } - etcdAddr := "127.0.0.1:" + etcdPort etcdCli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddr}}) assert.Nil(t, err) diff --git a/internal/master/tso/global_allocator.go b/internal/master/tso/global_allocator.go index d9bd6ceead..980af454f3 100644 --- a/internal/master/tso/global_allocator.go +++ b/internal/master/tso/global_allocator.go @@ -6,7 +6,7 @@ import ( "time" "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/util/kvutil" + "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.uber.org/zap" @@ -41,13 +41,13 @@ func Init(etcdAddr []string, rootPath string) { InitGlobalTsoAllocator("timestamp", tsoutil.NewTSOKVBase(etcdAddr, rootPath, "tso")) } -func InitGlobalTsoAllocator(key string, base kvutil.Base) { +func InitGlobalTsoAllocator(key string, base kv.Base) { allocator = NewGlobalTSOAllocator(key, base) allocator.Initialize() } // NewGlobalTSOAllocator creates a new global TSO allocator. -func NewGlobalTSOAllocator(key string, kvBase kvutil.Base) *GlobalTSOAllocator { +func NewGlobalTSOAllocator(key string, kvBase kv.Base) *GlobalTSOAllocator { var saveInterval = 3 * time.Second return &GlobalTSOAllocator{ tso: ×tampOracle{ diff --git a/internal/master/tso/global_allocator_test.go b/internal/master/tso/global_allocator_test.go index 70d318d471..4516456517 100644 --- a/internal/master/tso/global_allocator_test.go +++ b/internal/master/tso/global_allocator_test.go @@ -7,23 +7,19 @@ import ( "github.com/stretchr/testify/assert" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" + masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" ) var GTsoAllocator Allocator func TestMain(m *testing.M) { - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } - etcdPort, err := gparams.GParams.Load("etcd.port") - if err != nil { - panic(err) - } - etcdAddr := "127.0.0.1:" + etcdPort + masterParams.Params.Init() + etcdAddr, err := masterParams.Params.EtcdAddress() + if err != nil { + panic(err) + } GTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "tso")) exitCode := m.Run() diff --git a/internal/master/tso/tso.go b/internal/master/tso/tso.go index 81f246deb1..02a99931bf 100644 --- a/internal/master/tso/tso.go +++ b/internal/master/tso/tso.go @@ -22,7 +22,7 @@ import ( "go.uber.org/zap" "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/util/kvutil" + "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -47,7 +47,7 @@ type atomicObject struct { // timestampOracle is used to maintain the logic of tso. type timestampOracle struct { key string - kvBase kvutil.Base + kvBase kv.Base // TODO: remove saveInterval saveInterval time.Duration diff --git a/internal/proxy/paramtable.go b/internal/proxy/paramtable.go new file mode 100644 index 0000000000..e4f224b86e --- /dev/null +++ b/internal/proxy/paramtable.go @@ -0,0 +1,15 @@ +package proxy + +import ( + "github.com/zilliztech/milvus-distributed/internal/util/paramtable" +) + +type ParamTable struct { + paramtable.BaseTable +} + +var Params ParamTable + +func (p *ParamTable) InitParamTable() { + p.Init() +} diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 058894cc21..458bd58b64 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -5,12 +5,9 @@ import ( "log" "math/rand" "net" - "strconv" "sync" "time" - "github.com/zilliztech/milvus-distributed/internal/conf" - "google.golang.org/grpc" "github.com/zilliztech/milvus-distributed/internal/allocator" @@ -47,6 +44,10 @@ type Proxy struct { closeCallbacks []func() } +func Init() { + Params.InitParamTable() +} + func CreateProxy(ctx context.Context) (*Proxy, error) { rand.Seed(time.Now().UnixNano()) ctx1, cancel := context.WithCancel(ctx) @@ -79,14 +80,18 @@ func CreateProxy(ctx context.Context) (*Proxy, error) { unmarshal, bufSize) - idAllocator, err := allocator.NewIDAllocator(p.proxyLoopCtx) + masterAddr, err := Params.MasterAddress() + if err != nil { + panic(err) + } + idAllocator, err := allocator.NewIDAllocator(p.proxyLoopCtx, masterAddr) if err != nil { return nil, err } p.idAllocator = idAllocator - tsoAllocator, err := allocator.NewTimestampAllocator(p.proxyLoopCtx) + tsoAllocator, err := allocator.NewTimestampAllocator(p.proxyLoopCtx, masterAddr) if err != nil { return nil, err } @@ -147,9 +152,10 @@ func (p *Proxy) grpcLoop() { } func (p *Proxy) connectMaster() error { - masterHost := conf.Config.Master.Address - masterPort := conf.Config.Master.Port - masterAddr := masterHost + ":" + strconv.FormatInt(int64(masterPort), 10) + masterAddr, err := Params.MasterAddress() + if err != nil { + panic(err) + } log.Printf("Proxy connected to master, master_addr=%s", masterAddr) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 5b2e220fdb..c4ef84a4fc 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -14,14 +14,13 @@ import ( "google.golang.org/grpc" "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/conf" "github.com/zilliztech/milvus-distributed/internal/master" + masterParam "github.com/zilliztech/milvus-distributed/internal/master/paramtable" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" ) var ctx context.Context @@ -37,10 +36,14 @@ var masterServer *master.Master var testNum = 10 func startMaster(ctx context.Context) { - etcdAddr := conf.Config.Etcd.Address - etcdAddr += ":" - etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) - rootPath := conf.Config.Etcd.Rootpath + etcdAddr, err := Params.EtcdAddress() + if err != nil { + panic(err) + } + rootPath, err := Params.EtcdRootPath() + if err != nil { + panic(err) + } kvRootPath := path.Join(rootPath, "kv") metaRootPath := path.Join(rootPath, "meta") @@ -49,8 +52,8 @@ func startMaster(ctx context.Context) { if err != nil { log.Print("create server failed", zap.Error(err)) } - - if err := svr.Run(int64(conf.Config.Master.Port)); err != nil { + masterParam.Params.InitParamTable() + if err := svr.Run(int64(masterParam.Params.Port())); err != nil { log.Fatal("run server failed", zap.Error(err)) } @@ -73,11 +76,7 @@ func startProxy(ctx context.Context) { } func setup() { - conf.LoadConfig("config.yaml") - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } + Params.Init() ctx, cancel = context.WithCancel(context.Background()) startMaster(ctx) diff --git a/internal/proxy/timetick.go b/internal/proxy/timetick.go index 6ffb9e3c92..11d47ab9ad 100644 --- a/internal/proxy/timetick.go +++ b/internal/proxy/timetick.go @@ -3,18 +3,13 @@ package proxy import ( "context" "log" - "strconv" "sync" "time" - "github.com/zilliztech/milvus-distributed/internal/conf" - + "github.com/apache/pulsar-client-go/pulsar" "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/msgstream" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - - "github.com/apache/pulsar-client-go/pulsar" ) type tickCheckFunc = func(Timestamp) bool @@ -53,10 +48,11 @@ func newTimeTick(ctx context.Context, bufSize := int64(1000) t.tickMsgStream = msgstream.NewPulsarMsgStream(t.ctx, bufSize) - pulsarAddress := "pulsar://" - pulsarAddress += conf.Config.Pulsar.Address - pulsarAddress += ":" - pulsarAddress += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10) + pulsarAddress, err := Params.PulsarAddress() + if err != nil { + panic(err) + } + pulsarAddress = "pulsar://" + pulsarAddress producerChannels := []string{"timeTick"} t.tickMsgStream.SetPulsarCient(pulsarAddress) diff --git a/internal/proxy/timetick_test.go b/internal/proxy/timetick_test.go index 4dbcf1c693..387d47ff71 100644 --- a/internal/proxy/timetick_test.go +++ b/internal/proxy/timetick_test.go @@ -28,8 +28,11 @@ func TestTimeTick_Start(t *testing.T) { func TestTimeTick_Start2(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - - tsoAllocator, err := allocator.NewTimestampAllocator(ctx) + masterAddr, err := Params.MasterAddress() + if err != nil { + panic(err) + } + tsoAllocator, err := allocator.NewTimestampAllocator(ctx, masterAddr) assert.Nil(t, err) err = tsoAllocator.Start() assert.Nil(t, err) diff --git a/internal/reader/meta_service.go b/internal/reader/meta_service.go index 307df322a3..8af38ca830 100644 --- a/internal/reader/meta_service.go +++ b/internal/reader/meta_service.go @@ -14,7 +14,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" ) @@ -31,16 +30,12 @@ type metaService struct { } func newMetaService(ctx context.Context, container *container) *metaService { - ETCDAddr, err := gparams.GParams.Load("etcd.address") + ETCDAddr, err := Params.EtcdAddress() if err != nil { panic(err) } - ETCDPort, err := gparams.GParams.Load("etcd.port") - if err != nil { - panic(err) - } - ETCDAddr = "http://" + ETCDAddr + ":" + ETCDPort - ETCDRootPath, err := gparams.GParams.Load("etcd.rootpath") + + ETCDRootPath, err := Params.EtcdRootPath() if err != nil { panic(err) } @@ -83,7 +78,7 @@ func (mService *metaService) start() { } func GetCollectionObjID(key string) string { - ETCDRootPath, err := gparams.GParams.Load("etcd.rootpath") + ETCDRootPath, err := Params.EtcdRootPath() if err != nil { panic(err) } @@ -92,7 +87,7 @@ func GetCollectionObjID(key string) string { } func GetSegmentObjID(key string) string { - ETCDRootPath, err := gparams.GParams.Load("etcd.rootpath") + ETCDRootPath, err := Params.EtcdRootPath() if err != nil { panic(err) } @@ -101,7 +96,7 @@ func GetSegmentObjID(key string) string { } func isCollectionObj(key string) bool { - ETCDRootPath, err := gparams.GParams.Load("etcd.rootpath") + ETCDRootPath, err := Params.EtcdRootPath() if err != nil { panic(err) } @@ -113,7 +108,7 @@ func isCollectionObj(key string) bool { } func isSegmentObj(key string) bool { - ETCDRootPath, err := gparams.GParams.Load("etcd.rootpath") + ETCDRootPath, err := Params.EtcdRootPath() if err != nil { panic(err) } @@ -130,24 +125,9 @@ func isSegmentChannelRangeInQueryNodeChannelRange(segment *etcdpb.SegmentMeta) b return false } - readerTopicStart, err := gparams.GParams.Load("reader.topicstart") - if err != nil { - panic(err) - } - TopicStart, err := strconv.Atoi(readerTopicStart) - if err != nil { - panic(err) - } - readerTopicEnd, err := gparams.GParams.Load("reader.topicend") - if err != nil { - panic(err) - } - TopicEnd, err := strconv.Atoi(readerTopicEnd) - if err != nil { - panic(err) - } - var queryNodeChannelStart = TopicStart - var queryNodeChannelEnd = TopicEnd + Params.Init() + var queryNodeChannelStart = Params.TopicStart() + var queryNodeChannelEnd = Params.TopicEnd() if segment.ChannelStart >= int32(queryNodeChannelStart) && segment.ChannelEnd <= int32(queryNodeChannelEnd) { return true diff --git a/internal/reader/meta_service_test.go b/internal/reader/meta_service_test.go index 8c28e93882..4c366b3535 100644 --- a/internal/reader/meta_service_test.go +++ b/internal/reader/meta_service_test.go @@ -6,22 +6,16 @@ import ( "testing" "time" - gParams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" - "github.com/golang/protobuf/proto" - - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/conf" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" ) func TestMetaService_start(t *testing.T) { var ctx context.Context - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) + Params.Init() if closeWithDeadline { var cancel context.CancelFunc @@ -41,10 +35,7 @@ func TestMetaService_start(t *testing.T) { } func TestMetaService_getCollectionObjId(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() var key = "/collection/collection0" var collectionObjID1 = GetCollectionObjID(key) @@ -58,10 +49,7 @@ func TestMetaService_getCollectionObjId(t *testing.T) { } func TestMetaService_getSegmentObjId(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() var key = "/segment/segment0" var segmentObjID1 = GetSegmentObjID(key) @@ -75,10 +63,7 @@ func TestMetaService_getSegmentObjId(t *testing.T) { } func TestMetaService_isCollectionObj(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() var key = "by-dev/collection/collection0" var b1 = isCollectionObj(key) @@ -92,10 +77,7 @@ func TestMetaService_isCollectionObj(t *testing.T) { } func TestMetaService_isSegmentObj(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() var key = "by-dev/segment/segment0" var b1 = isSegmentObj(key) @@ -109,10 +91,7 @@ func TestMetaService_isSegmentObj(t *testing.T) { } func TestMetaService_isSegmentChannelRangeInQueryNodeChannelRange(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() var s = etcdpb.SegmentMeta{ SegmentID: UniqueID(0), @@ -201,7 +180,7 @@ func TestMetaService_printSegmentStruct(t *testing.T) { } func TestMetaService_processCollectionCreate(t *testing.T) { - conf.LoadConfig("config.yaml") + Params.Init() d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) @@ -247,10 +226,7 @@ func TestMetaService_processCollectionCreate(t *testing.T) { } func TestMetaService_processSegmentCreate(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) @@ -323,10 +299,7 @@ func TestMetaService_processSegmentCreate(t *testing.T) { } func TestMetaService_processCreate(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) @@ -383,10 +356,7 @@ func TestMetaService_processCreate(t *testing.T) { } func TestMetaService_processSegmentModify(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) @@ -470,10 +440,7 @@ func TestMetaService_processSegmentModify(t *testing.T) { } func TestMetaService_processCollectionModify(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) @@ -547,10 +514,7 @@ func TestMetaService_processCollectionModify(t *testing.T) { } func TestMetaService_processModify(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) @@ -649,10 +613,7 @@ func TestMetaService_processModify(t *testing.T) { } func TestMetaService_processSegmentDelete(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) @@ -728,10 +689,7 @@ func TestMetaService_processSegmentDelete(t *testing.T) { } func TestMetaService_processCollectionDelete(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) @@ -780,10 +738,7 @@ func TestMetaService_processCollectionDelete(t *testing.T) { } func TestMetaService_processDelete(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) @@ -847,10 +802,7 @@ func TestMetaService_processDelete(t *testing.T) { } func TestMetaService_processResp(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() var ctx context.Context if closeWithDeadline { @@ -878,10 +830,7 @@ func TestMetaService_processResp(t *testing.T) { } func TestMetaService_loadCollections(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() var ctx context.Context if closeWithDeadline { @@ -903,10 +852,7 @@ func TestMetaService_loadCollections(t *testing.T) { } func TestMetaService_loadSegments(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) - - conf.LoadConfig("config.yaml") + Params.Init() var ctx context.Context if closeWithDeadline { diff --git a/internal/reader/paramtable.go b/internal/reader/paramtable.go new file mode 100644 index 0000000000..c60b5b2f87 --- /dev/null +++ b/internal/reader/paramtable.go @@ -0,0 +1,35 @@ +package reader + +import ( + "strconv" + + "github.com/zilliztech/milvus-distributed/internal/util/paramtable" +) + +type ParamTable struct { + paramtable.BaseTable +} + +var Params ParamTable + +func (p *ParamTable) InitParamTable() { + p.Init() +} + +func (p *ParamTable) TopicStart() int { + topicStart, _ := p.Load("reader.topicstart") + topicStartNum, err := strconv.Atoi(topicStart) + if err != nil { + panic(err) + } + return topicStartNum +} + +func (p *ParamTable) TopicEnd() int { + topicEnd, _ := p.Load("reader.topicend") + topicEndNum, err := strconv.Atoi(topicEnd) + if err != nil { + panic(err) + } + return topicEndNum +} diff --git a/internal/reader/query_node_test.go b/internal/reader/query_node_test.go index 7b74c3d42c..b7de266460 100644 --- a/internal/reader/query_node_test.go +++ b/internal/reader/query_node_test.go @@ -4,9 +4,6 @@ import ( "context" "testing" "time" - - "github.com/stretchr/testify/assert" - gParams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" ) const ctxTimeInMillisecond = 2000 @@ -14,8 +11,7 @@ const closeWithDeadline = true // NOTE: start pulsar and etcd before test func TestQueryNode_start(t *testing.T) { - err := gParams.GParams.LoadYaml("config.yaml") - assert.NoError(t, err) + Params.Init() var ctx context.Context if closeWithDeadline { @@ -27,11 +23,10 @@ func TestQueryNode_start(t *testing.T) { ctx = context.Background() } - pulsarAddr, _ := gParams.GParams.Load("pulsar.address") - pulsarPort, _ := gParams.GParams.Load("pulsar.port") - pulsarAddr += ":" + pulsarPort - pulsarAddr = "pulsar://" + pulsarAddr - - node := NewQueryNode(ctx, 0, pulsarAddr) + pulsarAddr, err := Params.PulsarAddress() + if err != nil { + panic(err) + } + node := NewQueryNode(ctx, 0, "pulsar://"+pulsarAddr) node.Start() } diff --git a/internal/reader/reader.go b/internal/reader/reader.go index 39b328dffe..d4d6b7a5b6 100644 --- a/internal/reader/reader.go +++ b/internal/reader/reader.go @@ -4,6 +4,10 @@ import ( "context" ) +func Init() { + Params.Init() +} + func StartQueryNode(ctx context.Context, pulsarURL string) { node := NewQueryNode(ctx, 0, pulsarURL) diff --git a/internal/reader/segment_test.go b/internal/reader/segment_test.go index dcc8147802..8e679ddbbd 100644 --- a/internal/reader/segment_test.go +++ b/internal/reader/segment_test.go @@ -6,12 +6,10 @@ import ( "testing" "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - - "github.com/stretchr/testify/assert" ) //-------------------------------------------------------------------------------------- constructor and destructor diff --git a/internal/storage/internal/S3/S3_test.go b/internal/storage/internal/S3/S3_test.go index a71698927d..c565f4a5a1 100644 --- a/internal/storage/internal/S3/S3_test.go +++ b/internal/storage/internal/S3/S3_test.go @@ -5,10 +5,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + storagetype "github.com/zilliztech/milvus-distributed/internal/storage/type" ) +var option = storagetype.Option{BucketName: "zilliz-hz"} var ctx = context.Background() -var client, err = NewS3Driver(ctx) +var client, err = NewS3Driver(ctx, option) func TestS3Driver_PutRowAndGetRow(t *testing.T) { err = client.PutRow(ctx, []byte("bar"), []byte("abcdefghijklmnoopqrstuvwxyz"), "SegmentA", 1) diff --git a/internal/storage/internal/S3/s3_engine.go b/internal/storage/internal/S3/s3_engine.go index 8d391a95fa..8034d679e7 100644 --- a/internal/storage/internal/S3/s3_engine.go +++ b/internal/storage/internal/S3/s3_engine.go @@ -3,18 +3,14 @@ package s3driver import ( "bytes" "context" - "fmt" "io" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" . "github.com/zilliztech/milvus-distributed/internal/storage/type" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" ) -var bucketName string - type S3Store struct { client *s3.S3 } @@ -22,15 +18,6 @@ type S3Store struct { func NewS3Store(config aws.Config) (*S3Store, error) { sess := session.Must(session.NewSession(&config)) service := s3.New(sess) - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } - bucketName, err := gparams.GParams.Load("writer.bucket") - if err != nil { - panic(err) - } - fmt.Println(bucketName) return &S3Store{ client: service, diff --git a/internal/storage/internal/S3/s3_store.go b/internal/storage/internal/S3/s3_store.go index ac9a28e924..19199baa54 100644 --- a/internal/storage/internal/S3/s3_store.go +++ b/internal/storage/internal/S3/s3_store.go @@ -13,9 +13,13 @@ type S3Driver struct { driver *S3Store } -func NewS3Driver(ctx context.Context) (*S3Driver, error) { +var bucketName string + +func NewS3Driver(ctx context.Context, option Option) (*S3Driver, error) { // to-do read conf + bucketName = option.BucketName + S3Client, err := NewS3Store(aws.Config{ Region: aws.String(endpoints.CnNorthwest1RegionID)}) diff --git a/internal/storage/internal/minio/minio_store.go b/internal/storage/internal/minio/minio_store.go index ea8983be34..18e2512401 100644 --- a/internal/storage/internal/minio/minio_store.go +++ b/internal/storage/internal/minio/minio_store.go @@ -7,7 +7,6 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" "github.com/zilliztech/milvus-distributed/internal/storage/internal/minio/codec" storageType "github.com/zilliztech/milvus-distributed/internal/storage/type" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" ) type MinioDriver struct { @@ -16,21 +15,15 @@ type MinioDriver struct { var bucketName string -func NewMinioDriver(ctx context.Context) (*MinioDriver, error) { +func NewMinioDriver(ctx context.Context, option storageType.Option) (*MinioDriver, error) { // to-do read conf var endPoint = "localhost:9000" var accessKeyID = "testminio" var secretAccessKey = "testminio" var useSSL = false - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } - bucketName, err := gparams.GParams.Load("writer.bucket") - if err != nil { - panic(err) - } + bucketName := option.BucketName + minioClient, err := minio.New(endPoint, &minio.Options{ Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), Secure: useSSL, diff --git a/internal/storage/internal/minio/minio_test.go b/internal/storage/internal/minio/minio_test.go index 2029158039..d98a98cfea 100644 --- a/internal/storage/internal/minio/minio_test.go +++ b/internal/storage/internal/minio/minio_test.go @@ -5,10 +5,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + storagetype "github.com/zilliztech/milvus-distributed/internal/storage/type" ) +var option = storagetype.Option{BucketName: "zilliz-hz"} var ctx = context.Background() -var client, err = NewMinioDriver(ctx) +var client, err = NewMinioDriver(ctx, option) func TestMinioDriver_PutRowAndGetRow(t *testing.T) { err = client.PutRow(ctx, []byte("bar"), []byte("abcdefghijklmnoopqrstuvwxyz"), "SegmentA", 1) diff --git a/internal/storage/internal/tikv/tikv_store.go b/internal/storage/internal/tikv/tikv_store.go index 87db3a5ce9..5ecf8936f6 100644 --- a/internal/storage/internal/tikv/tikv_store.go +++ b/internal/storage/internal/tikv/tikv_store.go @@ -10,7 +10,7 @@ import ( "github.com/tikv/client-go/rawkv" . "github.com/zilliztech/milvus-distributed/internal/storage/internal/tikv/codec" . "github.com/zilliztech/milvus-distributed/internal/storage/type" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" + storagetype "github.com/zilliztech/milvus-distributed/internal/storage/type" ) func keyAddOne(key Key) Key { @@ -87,22 +87,10 @@ type TikvStore struct { engine *tikvEngine } -func NewTikvStore(ctx context.Context) (*TikvStore, error) { - err := gparams.GParams.LoadYaml("config.yaml") - if err != nil { - panic(err) - } - pdAddress, err := gparams.GParams.Load("storage.address") - if err != nil { - panic(err) - } - pdPort, err := gparams.GParams.Load("storage.port") - if err != nil { - panic(err) - } - pdAddress = pdAddress + ":" + pdPort +func NewTikvStore(ctx context.Context, option storagetype.Option) (*TikvStore, error) { + conf := config.Default() - client, err := rawkv.NewClient(ctx, []string{pdAddress}, conf) + client, err := rawkv.NewClient(ctx, []string{option.TikvAddress}, conf) if err != nil { return nil, err } diff --git a/internal/storage/internal/tikv/tikv_test.go b/internal/storage/internal/tikv/tikv_test.go index d473d54db0..4e69d14d2c 100644 --- a/internal/storage/internal/tikv/tikv_test.go +++ b/internal/storage/internal/tikv/tikv_test.go @@ -17,9 +17,10 @@ import ( //var store TikvStore var store *TikvStore +var option = Option{TikvAddress: "localhost:2379"} func TestMain(m *testing.M) { - store, _ = NewTikvStore(context.Background()) + store, _ = NewTikvStore(context.Background(), option) exitCode := m.Run() _ = store.Close() os.Exit(exitCode) diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 0c3cb788c7..67e9e44e89 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -10,25 +10,25 @@ import ( storagetype "github.com/zilliztech/milvus-distributed/internal/storage/type" ) -func NewStore(ctx context.Context, driver storagetype.DriverType) (storagetype.Store, error) { +func NewStore(ctx context.Context, option storagetype.Option) (storagetype.Store, error) { var err error var store storagetype.Store - switch driver { + switch option.Type { case storagetype.TIKVDriver: - store, err = tikvDriver.NewTikvStore(ctx) + store, err = tikvDriver.NewTikvStore(ctx, option) if err != nil { panic(err.Error()) } return store, nil case storagetype.MinIODriver: - store, err = minIODriver.NewMinioDriver(ctx) + store, err = minIODriver.NewMinioDriver(ctx, option) if err != nil { //panic(err.Error()) return nil, err } return store, nil case storagetype.S3DRIVER: - store, err = S3Driver.NewS3Driver(ctx) + store, err = S3Driver.NewS3Driver(ctx, option) if err != nil { //panic(err.Error()) return nil, err diff --git a/internal/storage/type/storagetype.go b/internal/storage/type/storagetype.go index 266f05bfad..9549a106e5 100644 --- a/internal/storage/type/storagetype.go +++ b/internal/storage/type/storagetype.go @@ -13,6 +13,12 @@ type DriverType = string type SegmentIndex = []byte type SegmentDL = []byte +type Option struct { + Type DriverType + TikvAddress string + BucketName string +} + const ( MinIODriver DriverType = "MinIO" TIKVDriver DriverType = "TIKV" diff --git a/internal/util/paramtable/paramtable.go b/internal/util/paramtable/paramtable.go new file mode 100644 index 0000000000..9b8ceb43f5 --- /dev/null +++ b/internal/util/paramtable/paramtable.go @@ -0,0 +1,138 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package paramtable + +import ( + "fmt" + "path" + "runtime" + "strconv" + "strings" + + "github.com/zilliztech/milvus-distributed/internal/kv" + + "github.com/spf13/viper" +) + +type Base interface { + Load(key string) (string, error) + LoadRange(key, endKey string, limit int) ([]string, []string, error) + LoadYaml(fileName string) error + Remove(key string) error + Save(key, value string) error + Init() +} + +type BaseTable struct { + params *kv.MemoryKV +} + +func (gp *BaseTable) Init() { + gp.params = kv.NewMemoryKV() + err := gp.LoadYaml("config.yaml") + if err != nil { + panic(err) + } + + etcdAddress, _ := gp.Load("etcd.address") + etcdPort, _ := gp.Load("etcd.port") + etcdAddress += ":" + etcdPort + err = gp.Save("_EtcdAddress", etcdAddress) + if err != nil { + panic(err) + } + + pulsarAddress, _ := gp.Load("pulsar.address") + pulsarPort, _ := gp.Load("pulsar.port") + pulsarAddress += ":" + pulsarPort + err = gp.Save("_PulsarAddress", pulsarAddress) + if err != nil { + panic(err) + } + + masterAddress, _ := gp.Load("master.address") + masterPort, _ := gp.Load("master.port") + masterAddress += ":" + masterPort + err = gp.Save("_MasterAddress", masterAddress) + if err != nil { + panic(err) + } +} + +func (gp *BaseTable) Load(key string) (string, error) { + return gp.params.Load(strings.ToLower(key)) +} + +func (gp *BaseTable) LoadRange(key, endKey string, limit int) ([]string, []string, error) { + return gp.params.LoadRange(strings.ToLower(key), strings.ToLower(endKey), limit) +} + +func (gp *BaseTable) LoadYaml(fileName string) error { + config := viper.New() + _, fpath, _, _ := runtime.Caller(0) + configPath := path.Dir(fpath) + "/../../../configs/" + config.SetConfigFile(configPath + fileName) + if err := config.ReadInConfig(); err != nil { + panic(err) + } + + for _, key := range config.AllKeys() { + fmt.Println(key) + err := gp.params.Save(strings.ToLower(key), config.GetString(key)) + if err != nil { + panic(err) + } + } + + return nil +} + +func (gp *BaseTable) Remove(key string) error { + return gp.params.Remove(strings.ToLower(key)) +} + +func (gp *BaseTable) Save(key, value string) error { + return gp.params.Save(strings.ToLower(key), value) +} + +func (gp *BaseTable) EtcdAddress() (string, error) { + return gp.Load("_EtcdAddress") +} + +func (gp *BaseTable) PulsarAddress() (string, error) { + return gp.Load("_PulsarAddress") +} + +func (gp *BaseTable) MasterAddress() (string, error) { + return gp.Load("_MasterAddress") +} + +func (gp *BaseTable) EtcdRootPath() (string, error) { + return gp.Load("etcd.rootpath") +} + +func (gp *BaseTable) TopicNum() (int, error) { + topicNum, _ := gp.Load("pulsar.topicnum") + return strconv.Atoi(topicNum) +} + +func (gp *BaseTable) StorageAddress() (string, error) { + storageAddress, _ := gp.Load("storage.address") + storagePort, _ := gp.Load("storage.address") + + return storageAddress + ":" + storagePort, nil +} + +func (gp *BaseTable) BucketName() string { + bucketName, _ := gp.Load("writer.bucket") + return bucketName +} diff --git a/internal/util/paramtableutil/paramtable_test.go b/internal/util/paramtable/paramtable_test.go similarity index 57% rename from internal/util/paramtableutil/paramtable_test.go rename to internal/util/paramtable/paramtable_test.go index ce5ac5d0ad..5957c72989 100644 --- a/internal/util/paramtableutil/paramtable_test.go +++ b/internal/util/paramtable/paramtable_test.go @@ -9,7 +9,7 @@ // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. -package paramtableutil +package paramtable import ( "testing" @@ -17,45 +17,51 @@ import ( "github.com/stretchr/testify/assert" ) -var param = NewGlobalParamsTable() +var Params = BaseTable{} + +func TestMain(m *testing.M) { + Params.Init() +} + +//func TestMain func TestGlobalParamsTable_SaveAndLoad(t *testing.T) { - err1 := param.Save("int", "10") + err1 := Params.Save("int", "10") assert.Nil(t, err1) - err2 := param.Save("string", "testSaveAndLoad") + err2 := Params.Save("string", "testSaveAndLoad") assert.Nil(t, err2) - err3 := param.Save("float", "1.234") + err3 := Params.Save("float", "1.234") assert.Nil(t, err3) - r1, _ := param.Load("int") + r1, _ := Params.Load("int") assert.Equal(t, "10", r1) - r2, _ := param.Load("string") + r2, _ := Params.Load("string") assert.Equal(t, "testSaveAndLoad", r2) - r3, _ := param.Load("float") + r3, _ := Params.Load("float") assert.Equal(t, "1.234", r3) - err4 := param.Remove("int") + err4 := Params.Remove("int") assert.Nil(t, err4) - err5 := param.Remove("string") + err5 := Params.Remove("string") assert.Nil(t, err5) - err6 := param.Remove("float") + err6 := Params.Remove("float") assert.Nil(t, err6) } func TestGlobalParamsTable_LoadRange(t *testing.T) { - _ = param.Save("abc", "10") - _ = param.Save("fghz", "20") - _ = param.Save("bcde", "1.1") - _ = param.Save("abcd", "testSaveAndLoad") - _ = param.Save("zhi", "12") + _ = Params.Save("abc", "10") + _ = Params.Save("fghz", "20") + _ = Params.Save("bcde", "1.1") + _ = Params.Save("abcd", "testSaveAndLoad") + _ = Params.Save("zhi", "12") - keys, values, err := param.LoadRange("a", "g", 10) + keys, values, err := Params.LoadRange("a", "g", 10) assert.Nil(t, err) assert.Equal(t, 4, len(keys)) assert.Equal(t, "10", values[0]) @@ -63,42 +69,42 @@ func TestGlobalParamsTable_LoadRange(t *testing.T) { assert.Equal(t, "1.1", values[2]) assert.Equal(t, "20", values[3]) - _ = param.Remove("abc") - _ = param.Remove("fghz") - _ = param.Remove("bcde") - _ = param.Remove("abcd") - _ = param.Remove("zhi") + _ = Params.Remove("abc") + _ = Params.Remove("fghz") + _ = Params.Remove("bcde") + _ = Params.Remove("abcd") + _ = Params.Remove("zhi") } func TestGlobalParamsTable_Remove(t *testing.T) { - err1 := param.Save("RemoveInt", "10") + err1 := Params.Save("RemoveInt", "10") assert.Nil(t, err1) - err2 := param.Save("RemoveString", "testRemove") + err2 := Params.Save("RemoveString", "testRemove") assert.Nil(t, err2) - err3 := param.Save("RemoveFloat", "1.234") + err3 := Params.Save("RemoveFloat", "1.234") assert.Nil(t, err3) - err4 := param.Remove("RemoveInt") + err4 := Params.Remove("RemoveInt") assert.Nil(t, err4) - err5 := param.Remove("RemoveString") + err5 := Params.Remove("RemoveString") assert.Nil(t, err5) - err6 := param.Remove("RemoveFloat") + err6 := Params.Remove("RemoveFloat") assert.Nil(t, err6) } func TestGlobalParamsTable_LoadYaml(t *testing.T) { - err := param.LoadYaml("config.yaml") + err := Params.LoadYaml("config.yaml") assert.Nil(t, err) - value1, err1 := param.Load("etcd.address") - value2, err2 := param.Load("pulsar.port") - value3, err3 := param.Load("reader.topicend") - value4, err4 := param.Load("proxy.pulsarTopics.readerTopicPrefix") - value5, err5 := param.Load("proxy.network.address") + value1, err1 := Params.Load("etcd.address") + value2, err2 := Params.Load("pulsar.port") + value3, err3 := Params.Load("reader.topicend") + value4, err4 := Params.Load("proxy.pulsarTopics.readerTopicPrefix") + value5, err5 := Params.Load("proxy.network.address") assert.Equal(t, value1, "localhost") assert.Equal(t, value2, "6650") diff --git a/internal/util/paramtableutil/kv.go b/internal/util/paramtableutil/kv.go deleted file mode 100644 index 8dfecf51b0..0000000000 --- a/internal/util/paramtableutil/kv.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package paramtableutil - -type Base interface { - Load(key string) (string, error) - LoadRange(key, endKey string, limit int) ([]string, []string, error) - LoadYaml(fileName string) error - Remove(key string) error - Save(key, value string) error -} diff --git a/internal/util/paramtableutil/paramtable.go b/internal/util/paramtableutil/paramtable.go deleted file mode 100644 index 1152e0efe1..0000000000 --- a/internal/util/paramtableutil/paramtable.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package paramtableutil - -import ( - "path" - "runtime" - "strings" - - "github.com/spf13/viper" - "github.com/zilliztech/milvus-distributed/internal/util/kvutil" -) - -type GlobalParamsTable struct { - params *kvutil.MemoryKV -} - -func NewGlobalParamsTable() Base { - return &GlobalParamsTable{params: kvutil.NewMemoryKV()} -} - -var GParams = NewGlobalParamsTable() - -func (gparams *GlobalParamsTable) Load(key string) (string, error) { - return gparams.params.Load(strings.ToLower(key)) -} - -func (gparams *GlobalParamsTable) LoadRange(key, endKey string, limit int) ([]string, []string, error) { - return gparams.params.LoadRange(strings.ToLower(key), strings.ToLower(endKey), limit) -} - -func (gparams *GlobalParamsTable) LoadYaml(fileName string) error { - config := viper.New() - _, fpath, _, _ := runtime.Caller(0) - configPath := path.Dir(fpath) + "/../../../configs/" - config.SetConfigFile(configPath + fileName) - if err := config.ReadInConfig(); err != nil { - panic(err) - } - - for _, key := range config.AllKeys() { - err := gparams.params.Save(strings.ToLower(key), config.GetString(key)) - if err != nil { - panic(err) - } - } - - return nil -} - -func (gparams *GlobalParamsTable) Remove(key string) error { - return gparams.params.Remove(strings.ToLower(key)) -} - -func (gparams *GlobalParamsTable) Save(key, value string) error { - return gparams.params.Save(strings.ToLower(key), value) -}