mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 02:48:45 +08:00
Support multi yaml files (#22583)
Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
This commit is contained in:
parent
e1c335ad59
commit
f1a60b295c
@ -34,8 +34,8 @@ func Init(opts ...Option) (*Manager, error) {
|
|||||||
opt(o)
|
opt(o)
|
||||||
}
|
}
|
||||||
sourceManager := NewManager()
|
sourceManager := NewManager()
|
||||||
if o.File != nil {
|
if o.FileInfo != nil {
|
||||||
s := NewFileSource(o.File)
|
s := NewFileSource(o.FileInfo)
|
||||||
sourceManager.AddSource(s)
|
sourceManager.AddSource(s)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -57,7 +57,7 @@ func TestConfigFromRemote(t *testing.T) {
|
|||||||
t.Setenv("TMP_KEY", "1")
|
t.Setenv("TMP_KEY", "1")
|
||||||
t.Setenv("log.level", "info")
|
t.Setenv("log.level", "info")
|
||||||
mgr, _ := Init(WithEnvSource(formatKey),
|
mgr, _ := Init(WithEnvSource(formatKey),
|
||||||
WithFilesSource(&FileInfo{"../../configs/milvus.yaml", -1}),
|
WithFilesSource(&FileInfo{[]string{"../../configs/milvus.yaml"}, -1}),
|
||||||
WithEtcdSource(&EtcdInfo{
|
WithEtcdSource(&EtcdInfo{
|
||||||
Endpoints: []string{cfg.ACUrls[0].Host},
|
Endpoints: []string{cfg.ACUrls[0].Host},
|
||||||
KeyPrefix: "test",
|
KeyPrefix: "test",
|
||||||
|
@ -31,7 +31,7 @@ import (
|
|||||||
|
|
||||||
type FileSource struct {
|
type FileSource struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
file string
|
files []string
|
||||||
configs map[string]string
|
configs map[string]string
|
||||||
|
|
||||||
configRefresher refresher
|
configRefresher refresher
|
||||||
@ -39,7 +39,7 @@ type FileSource struct {
|
|||||||
|
|
||||||
func NewFileSource(fileInfo *FileInfo) *FileSource {
|
func NewFileSource(fileInfo *FileInfo) *FileSource {
|
||||||
fs := &FileSource{
|
fs := &FileSource{
|
||||||
file: fileInfo.Filepath,
|
files: fileInfo.Files,
|
||||||
configs: make(map[string]string),
|
configs: make(map[string]string),
|
||||||
}
|
}
|
||||||
fs.configRefresher = newRefresher(fileInfo.RefreshInterval, fs.loadFromFile)
|
fs.configRefresher = newRefresher(fileInfo.RefreshInterval, fs.loadFromFile)
|
||||||
@ -95,46 +95,46 @@ func (fs *FileSource) SetEventHandler(eh EventHandler) {
|
|||||||
}
|
}
|
||||||
func (fs *FileSource) loadFromFile() error {
|
func (fs *FileSource) loadFromFile() error {
|
||||||
yamlReader := viper.New()
|
yamlReader := viper.New()
|
||||||
configFile := fs.file
|
|
||||||
if _, err := os.Stat(configFile); err != nil {
|
|
||||||
return errors.New("cannot access config file: " + configFile)
|
|
||||||
}
|
|
||||||
|
|
||||||
yamlReader.SetConfigFile(configFile)
|
|
||||||
if err := yamlReader.ReadInConfig(); err != nil {
|
|
||||||
log.Warn("Read config failed", zap.Error(err))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
newConfig := make(map[string]string)
|
newConfig := make(map[string]string)
|
||||||
for _, key := range yamlReader.AllKeys() {
|
for _, configFile := range fs.files {
|
||||||
val := yamlReader.Get(key)
|
if _, err := os.Stat(configFile); err != nil {
|
||||||
str, err := cast.ToStringE(val)
|
log.Info("cannot access config file", zap.String("configFile", configFile))
|
||||||
if err != nil {
|
continue
|
||||||
switch val := val.(type) {
|
|
||||||
case []any:
|
|
||||||
str = str[:0]
|
|
||||||
for _, v := range val {
|
|
||||||
ss, err := cast.ToStringE(v)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("cast to string failed", zap.Any("value", v))
|
|
||||||
}
|
|
||||||
if str == "" {
|
|
||||||
str = ss
|
|
||||||
} else {
|
|
||||||
str = str + "," + ss
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
log.Warn("val is not a slice", zap.Any("value", val))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
newConfig[key] = str
|
|
||||||
newConfig[formatKey(key)] = str
|
|
||||||
}
|
|
||||||
|
|
||||||
|
yamlReader.SetConfigFile(configFile)
|
||||||
|
if err := yamlReader.ReadInConfig(); err != nil {
|
||||||
|
return errors.Wrap(err, "Read config failed: "+configFile)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range yamlReader.AllKeys() {
|
||||||
|
val := yamlReader.Get(key)
|
||||||
|
str, err := cast.ToStringE(val)
|
||||||
|
if err != nil {
|
||||||
|
switch val := val.(type) {
|
||||||
|
case []any:
|
||||||
|
str = str[:0]
|
||||||
|
for _, v := range val {
|
||||||
|
ss, err := cast.ToStringE(v)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("cast to string failed", zap.Any("value", v))
|
||||||
|
}
|
||||||
|
if str == "" {
|
||||||
|
str = ss
|
||||||
|
} else {
|
||||||
|
str = str + "," + ss
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
log.Warn("val is not a slice", zap.Any("value", val))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
newConfig[key] = str
|
||||||
|
newConfig[formatKey(key)] = str
|
||||||
|
}
|
||||||
|
}
|
||||||
fs.Lock()
|
fs.Lock()
|
||||||
defer fs.Unlock()
|
defer fs.Unlock()
|
||||||
err := fs.configRefresher.fireEvents(fs.GetSourceName(), fs.configs, newConfig)
|
err := fs.configRefresher.fireEvents(fs.GetSourceName(), fs.configs, newConfig)
|
||||||
|
@ -49,13 +49,13 @@ type EtcdInfo struct {
|
|||||||
|
|
||||||
// FileInfo has attribute for file source
|
// FileInfo has attribute for file source
|
||||||
type FileInfo struct {
|
type FileInfo struct {
|
||||||
Filepath string
|
Files []string
|
||||||
RefreshInterval time.Duration
|
RefreshInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Options hold options
|
// Options hold options
|
||||||
type Options struct {
|
type Options struct {
|
||||||
File *FileInfo
|
FileInfo *FileInfo
|
||||||
EtcdInfo *EtcdInfo
|
EtcdInfo *EtcdInfo
|
||||||
EnvKeyFormatter func(string) string
|
EnvKeyFormatter func(string) string
|
||||||
}
|
}
|
||||||
@ -66,7 +66,7 @@ type Option func(options *Options)
|
|||||||
// WithRequiredFiles tell archaius to manage files, if not exist will return error
|
// WithRequiredFiles tell archaius to manage files, if not exist will return error
|
||||||
func WithFilesSource(fi *FileInfo) Option {
|
func WithFilesSource(fi *FileInfo) Option {
|
||||||
return func(options *Options) {
|
return func(options *Options) {
|
||||||
options.File = fi
|
options.FileInfo = fi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,6 +17,8 @@
|
|||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@ -24,13 +26,27 @@ import (
|
|||||||
|
|
||||||
func TestLoadFromFileSource(t *testing.T) {
|
func TestLoadFromFileSource(t *testing.T) {
|
||||||
t.Run("file not exist", func(t *testing.T) {
|
t.Run("file not exist", func(t *testing.T) {
|
||||||
fs := NewFileSource(&FileInfo{"file_not_exist.yaml", -1})
|
fs := NewFileSource(&FileInfo{[]string{"file_not_exist.yaml"}, -1})
|
||||||
err := fs.loadFromFile()
|
_ = fs.loadFromFile()
|
||||||
assert.Error(t, err, "cannot access config file: file_not_exist.yaml")
|
assert.Zero(t, len(fs.configs))
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("file type not support", func(t *testing.T) {
|
t.Run("file type not support", func(t *testing.T) {
|
||||||
fs := NewFileSource(&FileInfo{"../../go.mod", -1})
|
fs := NewFileSource(&FileInfo{[]string{"../../go.mod"}, -1})
|
||||||
err := fs.loadFromFile()
|
err := fs.loadFromFile()
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("multiple files", func(t *testing.T) {
|
||||||
|
dir, _ := os.MkdirTemp("", "milvus")
|
||||||
|
os.WriteFile(path.Join(dir, "milvus.yaml"), []byte("a.b: 1\nc.d: 2"), 0600)
|
||||||
|
os.WriteFile(path.Join(dir, "user.yaml"), []byte("a.b: 3"), 0600)
|
||||||
|
|
||||||
|
fs := NewFileSource(&FileInfo{[]string{path.Join(dir, "milvus.yaml"), path.Join(dir, "user.yaml")}, -1})
|
||||||
|
fs.loadFromFile()
|
||||||
|
v1, _ := fs.GetConfigurationByKey("a.b")
|
||||||
|
assert.Equal(t, "3", v1)
|
||||||
|
v2, _ := fs.GetConfigurationByKey("c.d")
|
||||||
|
assert.Equal(t, "2", v2)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/util/etcd"
|
"github.com/milvus-io/milvus/internal/util/etcd"
|
||||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
|
"github.com/samber/lo"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -30,7 +31,6 @@ import (
|
|||||||
type UniqueID = typeutil.UniqueID
|
type UniqueID = typeutil.UniqueID
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DefaultMilvusYaml = "milvus.yaml"
|
|
||||||
DefaultEasyloggingYaml = "easylogging.yaml"
|
DefaultEasyloggingYaml = "easylogging.yaml"
|
||||||
DefaultMinioHost = "localhost"
|
DefaultMinioHost = "localhost"
|
||||||
DefaultMinioPort = "9000"
|
DefaultMinioPort = "9000"
|
||||||
@ -53,7 +53,7 @@ func globalConfigPrefixs() []string {
|
|||||||
return []string{"metastore.", "localStorage.", "etcd.", "mysql.", "minio.", "pulsar.", "kafka.", "rocksmq.", "log.", "grpc.", "common.", "quotaAndLimits."}
|
return []string{"metastore.", "localStorage.", "etcd.", "mysql.", "minio.", "pulsar.", "kafka.", "rocksmq.", "log.", "grpc.", "common.", "quotaAndLimits."}
|
||||||
}
|
}
|
||||||
|
|
||||||
var defaultYaml = DefaultMilvusYaml
|
var defaultYaml = []string{"milvus.yaml", "default.yaml", "user.yaml"}
|
||||||
|
|
||||||
// BaseTable the basics of paramtable
|
// BaseTable the basics of paramtable
|
||||||
type BaseTable struct {
|
type BaseTable struct {
|
||||||
@ -61,17 +61,17 @@ type BaseTable struct {
|
|||||||
mgr *config.Manager
|
mgr *config.Manager
|
||||||
|
|
||||||
configDir string
|
configDir string
|
||||||
YamlFile string
|
YamlFiles []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBaseTableFromYamlOnly only used in migration tool.
|
// NewBaseTableFromYamlOnly only used in migration tool.
|
||||||
// Maybe we shouldn't limit the configDir internally.
|
// Maybe we shouldn't limit the configDir internally.
|
||||||
func NewBaseTableFromYamlOnly(yaml string) *BaseTable {
|
func NewBaseTableFromYamlOnly(yaml string) *BaseTable {
|
||||||
mgr, _ := config.Init(config.WithFilesSource(&config.FileInfo{
|
mgr, _ := config.Init(config.WithFilesSource(&config.FileInfo{
|
||||||
Filepath: yaml,
|
Files: []string{yaml},
|
||||||
RefreshInterval: 10 * time.Second,
|
RefreshInterval: 10 * time.Second,
|
||||||
}))
|
}))
|
||||||
gp := &BaseTable{mgr: mgr, YamlFile: yaml}
|
gp := &BaseTable{mgr: mgr, YamlFiles: []string{yaml}}
|
||||||
return gp
|
return gp
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,7 +81,7 @@ func NewBaseTableFromYamlOnly(yaml string) *BaseTable {
|
|||||||
// GlobalInitWithYaml should be called only in standalone and embedded Milvus.
|
// GlobalInitWithYaml should be called only in standalone and embedded Milvus.
|
||||||
func (gp *BaseTable) GlobalInitWithYaml(yaml string) {
|
func (gp *BaseTable) GlobalInitWithYaml(yaml string) {
|
||||||
gp.once.Do(func() {
|
gp.once.Do(func() {
|
||||||
defaultYaml = yaml
|
defaultYaml = []string{yaml}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,8 +96,8 @@ func (gp *BaseTable) init(refreshInterval int) {
|
|||||||
ret = strings.ReplaceAll(ret, ".", "")
|
ret = strings.ReplaceAll(ret, ".", "")
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
if gp.YamlFile == "" {
|
if len(gp.YamlFiles) == 0 {
|
||||||
gp.YamlFile = DefaultMilvusYaml
|
gp.YamlFiles = defaultYaml
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
gp.mgr, err = config.Init(config.WithEnvSource(formatter))
|
gp.mgr, err = config.Init(config.WithEnvSource(formatter))
|
||||||
@ -110,13 +110,14 @@ func (gp *BaseTable) init(refreshInterval int) {
|
|||||||
|
|
||||||
func (gp *BaseTable) initConfigsFromLocal(refreshInterval int) {
|
func (gp *BaseTable) initConfigsFromLocal(refreshInterval int) {
|
||||||
gp.configDir = gp.initConfPath()
|
gp.configDir = gp.initConfPath()
|
||||||
configFilePath := gp.configDir + "/" + gp.YamlFile
|
|
||||||
err := gp.mgr.AddSource(config.NewFileSource(&config.FileInfo{
|
err := gp.mgr.AddSource(config.NewFileSource(&config.FileInfo{
|
||||||
Filepath: configFilePath,
|
Files: lo.Map(gp.YamlFiles, func(file string, _ int) string {
|
||||||
|
return path.Join(gp.configDir, file)
|
||||||
|
}),
|
||||||
RefreshInterval: time.Duration(refreshInterval) * time.Second,
|
RefreshInterval: time.Duration(refreshInterval) * time.Second,
|
||||||
}))
|
}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("init baseTable with file failed", zap.String("configFile", configFilePath), zap.Error(err))
|
log.Warn("init baseTable with file failed", zap.Strings("configFile", gp.YamlFiles), zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,7 +13,7 @@ type hookConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *hookConfig) init() {
|
func (h *hookConfig) init() {
|
||||||
base := &BaseTable{YamlFile: hookYamlFile}
|
base := &BaseTable{YamlFiles: []string{hookYamlFile}}
|
||||||
base.init(0)
|
base.init(0)
|
||||||
log.Info("hook config", zap.Any("hook", base.FileConfigs()))
|
log.Info("hook config", zap.Any("hook", base.FileConfigs()))
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user