mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
Add var-name sub linter in revive (#27424)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
5c1abfa2cc
commit
cd5f03f80c
@ -48,6 +48,11 @@ linters-settings:
|
||||
rules:
|
||||
- name: unused-parameter
|
||||
disabled: true
|
||||
- name: var-naming
|
||||
severity: warning
|
||||
disabled: false
|
||||
arguments:
|
||||
- ["ID"] # Allow list
|
||||
misspell:
|
||||
locale: US
|
||||
gocritic:
|
||||
|
@ -4167,7 +4167,7 @@ func TestDataCoordServer_UpdateChannelCheckpoint(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
var global_test_tikv = tikv.SetupLocalTxn()
|
||||
var globalTestTikv = tikv.SetupLocalTxn()
|
||||
|
||||
func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
|
||||
var err error
|
||||
@ -4188,7 +4188,7 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
|
||||
|
||||
svr := CreateServer(context.TODO(), factory)
|
||||
svr.SetEtcdClient(etcdCli)
|
||||
svr.SetTiKVClient(global_test_tikv)
|
||||
svr.SetTiKVClient(globalTestTikv)
|
||||
|
||||
svr.dataNodeCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
|
||||
return newMockDataNodeClient(0, receiveCh)
|
||||
@ -4242,7 +4242,7 @@ func newTestServerWithMeta(t *testing.T, receiveCh chan any, meta *meta, opts ..
|
||||
|
||||
svr := CreateServer(context.TODO(), factory, opts...)
|
||||
svr.SetEtcdClient(etcdCli)
|
||||
svr.SetTiKVClient(global_test_tikv)
|
||||
svr.SetTiKVClient(globalTestTikv)
|
||||
|
||||
svr.dataNodeCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
|
||||
return newMockDataNodeClient(0, receiveCh)
|
||||
@ -4299,7 +4299,7 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server {
|
||||
|
||||
svr := CreateServer(context.TODO(), factory, opts...)
|
||||
svr.SetEtcdClient(etcdCli)
|
||||
svr.SetTiKVClient(global_test_tikv)
|
||||
svr.SetTiKVClient(globalTestTikv)
|
||||
|
||||
svr.dataNodeCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
|
||||
return newMockDataNodeClient(0, receiveCh)
|
||||
@ -4495,7 +4495,7 @@ func testDataCoordBase(t *testing.T, opts ...Option) *Server {
|
||||
|
||||
svr := CreateServer(ctx, factory, opts...)
|
||||
svr.SetEtcdClient(etcdCli)
|
||||
svr.SetTiKVClient(global_test_tikv)
|
||||
svr.SetTiKVClient(globalTestTikv)
|
||||
|
||||
svr.SetDataNodeCreator(func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
|
||||
return newMockDataNodeClient(0, nil)
|
||||
|
@ -80,8 +80,8 @@ func setupRemoteTiKV() {
|
||||
}
|
||||
}
|
||||
|
||||
func setupTiKV(use_remote bool) {
|
||||
if use_remote {
|
||||
func setupTiKV(useRemote bool) {
|
||||
if useRemote {
|
||||
setupRemoteTiKV()
|
||||
} else {
|
||||
setupLocalTiKV()
|
||||
|
@ -42,7 +42,7 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
)
|
||||
|
||||
// A quick note is that we are using logging_error at our outermost scope in order to perform logging
|
||||
// A quick note is that we are using loggingErr at our outermost scope in order to perform logging
|
||||
|
||||
const (
|
||||
// We are using a Snapshot instead of transaction when doing read only operations due to the
|
||||
@ -135,8 +135,8 @@ func (kv *txnTiKV) Has(key string) (bool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var logging_error error
|
||||
defer logWarnOnFailure(&logging_error, "txnTiKV Has() error", zap.String("key", key))
|
||||
var loggingErr error
|
||||
defer logWarnOnFailure(&loggingErr, "txnTiKV Has() error", zap.String("key", key))
|
||||
|
||||
_, err := kv.getTiKVMeta(ctx, key)
|
||||
if err != nil {
|
||||
@ -144,8 +144,8 @@ func (kv *txnTiKV) Has(key string) (bool, error) {
|
||||
if common.IsKeyNotExistError(err) {
|
||||
return false, nil
|
||||
} else {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to read key: %s", key))
|
||||
return false, logging_error
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to read key: %s", key))
|
||||
return false, loggingErr
|
||||
}
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow txnTiKV Has() operation", zap.String("key", key))
|
||||
@ -163,8 +163,8 @@ func (kv *txnTiKV) HasPrefix(prefix string) (bool, error) {
|
||||
start := time.Now()
|
||||
prefix = path.Join(kv.rootPath, prefix)
|
||||
|
||||
var logging_error error
|
||||
defer logWarnOnFailure(&logging_error, "txnTiKV HasPrefix() error", zap.String("prefix", prefix))
|
||||
var loggingErr error
|
||||
defer logWarnOnFailure(&loggingErr, "txnTiKV HasPrefix() error", zap.String("prefix", prefix))
|
||||
|
||||
ss := getSnapshot(kv.txn, SnapshotScanSize)
|
||||
|
||||
@ -174,8 +174,8 @@ func (kv *txnTiKV) HasPrefix(prefix string) (bool, error) {
|
||||
|
||||
iter, err := ss.Iter(startKey, endKey)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to create iterator for prefix: %s", prefix))
|
||||
return false, logging_error
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to create iterator for prefix: %s", prefix))
|
||||
return false, loggingErr
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
@ -195,17 +195,17 @@ func (kv *txnTiKV) Load(key string) (string, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var logging_error error
|
||||
defer logWarnOnFailure(&logging_error, "txnTiKV Load() error", zap.String("key", key))
|
||||
var loggingErr error
|
||||
defer logWarnOnFailure(&loggingErr, "txnTiKV Load() error", zap.String("key", key))
|
||||
|
||||
val, err := kv.getTiKVMeta(ctx, key)
|
||||
if err != nil {
|
||||
if common.IsKeyNotExistError(err) {
|
||||
logging_error = err
|
||||
loggingErr = err
|
||||
} else {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to read key %s", key))
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to read key %s", key))
|
||||
}
|
||||
return "", logging_error
|
||||
return "", loggingErr
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow txnTiKV Load() operation", zap.String("key", key))
|
||||
return val, nil
|
||||
@ -226,40 +226,40 @@ func (kv *txnTiKV) MultiLoad(keys []string) ([]string, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var logging_error error
|
||||
defer logWarnOnFailure(&logging_error, "txnTiKV MultiLoad() error", zap.Strings("keys", keys))
|
||||
var loggingErr error
|
||||
defer logWarnOnFailure(&loggingErr, "txnTiKV MultiLoad() error", zap.Strings("keys", keys))
|
||||
|
||||
// Convert from []string to [][]byte
|
||||
byte_keys := batchConvertFromString(kv.rootPath, keys)
|
||||
byteKeys := batchConvertFromString(kv.rootPath, keys)
|
||||
|
||||
// Since only reading, use Snapshot for less overhead
|
||||
ss := getSnapshot(kv.txn, SnapshotScanSize)
|
||||
|
||||
key_map, err := ss.BatchGet(ctx, byte_keys)
|
||||
keyMap, err := ss.BatchGet(ctx, byteKeys)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, "Failed ss.BatchGet() for MultiLoad")
|
||||
return nil, logging_error
|
||||
loggingErr = errors.Wrap(err, "Failed ss.BatchGet() for MultiLoad")
|
||||
return nil, loggingErr
|
||||
}
|
||||
|
||||
missing_values := []string{}
|
||||
valid_values := []string{}
|
||||
missingValues := []string{}
|
||||
validValues := []string{}
|
||||
|
||||
// Loop through keys and build valid/invalid slices
|
||||
for _, k := range keys {
|
||||
v, ok := key_map[k]
|
||||
v, ok := keyMap[k]
|
||||
if !ok {
|
||||
missing_values = append(missing_values, k)
|
||||
missingValues = append(missingValues, k)
|
||||
}
|
||||
// Check if empty value placeholder
|
||||
str_val := convertEmptyByteToString(v)
|
||||
valid_values = append(valid_values, str_val)
|
||||
strVal := convertEmptyByteToString(v)
|
||||
validValues = append(validValues, strVal)
|
||||
}
|
||||
if len(missing_values) != 0 {
|
||||
logging_error = fmt.Errorf("There are invalid keys: %s", missing_values)
|
||||
if len(missingValues) != 0 {
|
||||
loggingErr = fmt.Errorf("There are invalid keys: %s", missingValues)
|
||||
}
|
||||
|
||||
CheckElapseAndWarn(start, "Slow txnTiKV MultiLoad() operation", zap.Any("keys", keys))
|
||||
return valid_values, logging_error
|
||||
return validValues, loggingErr
|
||||
}
|
||||
|
||||
// LoadWithPrefix returns all the keys and values for the given key prefix.
|
||||
@ -267,8 +267,8 @@ func (kv *txnTiKV) LoadWithPrefix(prefix string) ([]string, []string, error) {
|
||||
start := time.Now()
|
||||
prefix = path.Join(kv.rootPath, prefix)
|
||||
|
||||
var logging_error error
|
||||
defer logWarnOnFailure(&logging_error, "txnTiKV LoadWithPrefix() error", zap.String("prefix", prefix))
|
||||
var loggingErr error
|
||||
defer logWarnOnFailure(&loggingErr, "txnTiKV LoadWithPrefix() error", zap.String("prefix", prefix))
|
||||
|
||||
ss := getSnapshot(kv.txn, SnapshotScanSize)
|
||||
|
||||
@ -277,8 +277,8 @@ func (kv *txnTiKV) LoadWithPrefix(prefix string) ([]string, []string, error) {
|
||||
endKey := tikv.PrefixNextKey([]byte(prefix))
|
||||
iter, err := ss.Iter(startKey, endKey)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to create iterater for LoadWithPrefix() for prefix: %s", prefix))
|
||||
return nil, nil, logging_error
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to create iterater for LoadWithPrefix() for prefix: %s", prefix))
|
||||
return nil, nil, loggingErr
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
@ -289,13 +289,13 @@ func (kv *txnTiKV) LoadWithPrefix(prefix string) ([]string, []string, error) {
|
||||
for iter.Valid() {
|
||||
val := iter.Value()
|
||||
// Check if empty value placeholder
|
||||
str_val := convertEmptyByteToString(val)
|
||||
strVal := convertEmptyByteToString(val)
|
||||
keys = append(keys, string(iter.Key()))
|
||||
values = append(values, str_val)
|
||||
values = append(values, strVal)
|
||||
err = iter.Next()
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to iterate for LoadWithPrefix() for prefix: %s", prefix))
|
||||
return nil, nil, logging_error
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to iterate for LoadWithPrefix() for prefix: %s", prefix))
|
||||
return nil, nil, loggingErr
|
||||
}
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow txnTiKV LoadWithPrefix() operation", zap.String("prefix", prefix))
|
||||
@ -308,11 +308,11 @@ func (kv *txnTiKV) Save(key, value string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var logging_error error
|
||||
defer logWarnOnFailure(&logging_error, "txnTiKV Save() error", zap.String("key", key), zap.String("value", value))
|
||||
var loggingErr error
|
||||
defer logWarnOnFailure(&loggingErr, "txnTiKV Save() error", zap.String("key", key), zap.String("value", value))
|
||||
|
||||
logging_error = kv.putTiKVMeta(ctx, key, value)
|
||||
return logging_error
|
||||
loggingErr = kv.putTiKVMeta(ctx, key, value)
|
||||
return loggingErr
|
||||
}
|
||||
|
||||
// MultiSave saves the input key-value pairs in transaction manner.
|
||||
@ -321,37 +321,37 @@ func (kv *txnTiKV) MultiSave(kvs map[string]string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var logging_error error
|
||||
defer logWarnOnFailure(&logging_error, "txnTiKV MultiSave() error", zap.Any("kvs", kvs), zap.Int("len", len(kvs)))
|
||||
var loggingErr error
|
||||
defer logWarnOnFailure(&loggingErr, "txnTiKV MultiSave() error", zap.Any("kvs", kvs), zap.Int("len", len(kvs)))
|
||||
|
||||
txn, err := beginTxn(kv.txn)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, "Failed to create txn for MultiSave")
|
||||
return logging_error
|
||||
loggingErr = errors.Wrap(err, "Failed to create txn for MultiSave")
|
||||
return loggingErr
|
||||
}
|
||||
|
||||
// Defer a rollback only if the transaction hasn't been committed
|
||||
defer rollbackOnFailure(&logging_error, txn)
|
||||
defer rollbackOnFailure(&loggingErr, txn)
|
||||
|
||||
for key, value := range kvs {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
// Check if value is empty or taking reserved EmptyValue
|
||||
byte_value, err := convertEmptyStringToByte(value)
|
||||
byteValue, err := convertEmptyStringToByte(value)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSave()", key, value))
|
||||
return logging_error
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSave()", key, value))
|
||||
return loggingErr
|
||||
}
|
||||
// Save the value within a transaction
|
||||
err = txn.Set([]byte(key), byte_value)
|
||||
err = txn.Set([]byte(key), byteValue)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSave()", key, value))
|
||||
return logging_error
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSave()", key, value))
|
||||
return loggingErr
|
||||
}
|
||||
}
|
||||
err = kv.executeTxn(txn, ctx)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, "Failed to commit for MultiSave()")
|
||||
return logging_error
|
||||
loggingErr = errors.Wrap(err, "Failed to commit for MultiSave()")
|
||||
return loggingErr
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow txnTiKV MultiSave() operation", zap.Any("kvs", kvs))
|
||||
return nil
|
||||
@ -363,11 +363,11 @@ func (kv *txnTiKV) Remove(key string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var logging_error error
|
||||
defer logWarnOnFailure(&logging_error, "txnTiKV Remove() error", zap.String("key", key))
|
||||
var loggingErr error
|
||||
defer logWarnOnFailure(&loggingErr, "txnTiKV Remove() error", zap.String("key", key))
|
||||
|
||||
logging_error = kv.removeTiKVMeta(ctx, key)
|
||||
return logging_error
|
||||
loggingErr = kv.removeTiKVMeta(ctx, key)
|
||||
return loggingErr
|
||||
}
|
||||
|
||||
// MultiRemove removes the input keys in transaction manner.
|
||||
@ -376,31 +376,31 @@ func (kv *txnTiKV) MultiRemove(keys []string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var logging_error error
|
||||
defer logWarnOnFailure(&logging_error, "txnTiKV MultiRemove() error", zap.Strings("keys", keys), zap.Int("len", len(keys)))
|
||||
var loggingErr error
|
||||
defer logWarnOnFailure(&loggingErr, "txnTiKV MultiRemove() error", zap.Strings("keys", keys), zap.Int("len", len(keys)))
|
||||
|
||||
txn, err := beginTxn(kv.txn)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, "Failed to create txn for MultiRemove")
|
||||
return logging_error
|
||||
loggingErr = errors.Wrap(err, "Failed to create txn for MultiRemove")
|
||||
return loggingErr
|
||||
}
|
||||
|
||||
// Defer a rollback only if the transaction hasn't been committed
|
||||
defer rollbackOnFailure(&logging_error, txn)
|
||||
defer rollbackOnFailure(&loggingErr, txn)
|
||||
|
||||
for _, key := range keys {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
logging_error = txn.Delete([]byte(key))
|
||||
loggingErr = txn.Delete([]byte(key))
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiRemove", key))
|
||||
return logging_error
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to delete %s for MultiRemove", key))
|
||||
return loggingErr
|
||||
}
|
||||
}
|
||||
|
||||
err = kv.executeTxn(txn, ctx)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, "Failed to commit for MultiRemove()")
|
||||
return logging_error
|
||||
loggingErr = errors.Wrap(err, "Failed to commit for MultiRemove()")
|
||||
return loggingErr
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow txnTiKV MultiRemove() operation", zap.Strings("keys", keys))
|
||||
return nil
|
||||
@ -413,15 +413,15 @@ func (kv *txnTiKV) RemoveWithPrefix(prefix string) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
var logging_error error
|
||||
defer logWarnOnFailure(&logging_error, "txnTiKV RemoveWithPrefix() error", zap.String("prefix", prefix))
|
||||
var loggingErr error
|
||||
defer logWarnOnFailure(&loggingErr, "txnTiKV RemoveWithPrefix() error", zap.String("prefix", prefix))
|
||||
|
||||
startKey := []byte(prefix)
|
||||
endKey := tikv.PrefixNextKey(startKey)
|
||||
_, err := kv.txn.DeleteRange(ctx, startKey, endKey, 1)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, "Failed to DeleteRange for RemoveWithPrefix")
|
||||
return logging_error
|
||||
loggingErr = errors.Wrap(err, "Failed to DeleteRange for RemoveWithPrefix")
|
||||
return loggingErr
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow txnTiKV RemoveWithPrefix() operation", zap.String("prefix", prefix))
|
||||
return nil
|
||||
@ -461,12 +461,12 @@ func (kv *txnTiKV) MultiSaveAndRemove(saves map[string]string, removals []string
|
||||
for key, value := range saves {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
// Check if value is empty or taking reserved EmptyValue
|
||||
byte_value, err := convertEmptyStringToByte(value)
|
||||
byteValue, err := convertEmptyStringToByte(value)
|
||||
if err != nil {
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemove", key, value))
|
||||
return loggingErr
|
||||
}
|
||||
err = txn.Set([]byte(key), byte_value)
|
||||
err = txn.Set([]byte(key), byteValue)
|
||||
if err != nil {
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemove", key, value))
|
||||
return loggingErr
|
||||
@ -525,12 +525,12 @@ func (kv *txnTiKV) MultiSaveAndRemoveWithPrefix(saves map[string]string, removal
|
||||
for key, value := range saves {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
// Check if value is empty or taking reserved EmptyValue
|
||||
byte_value, err := convertEmptyStringToByte(value)
|
||||
byteValue, err := convertEmptyStringToByte(value)
|
||||
if err != nil {
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value))
|
||||
return loggingErr
|
||||
}
|
||||
err = txn.Set([]byte(key), byte_value)
|
||||
err = txn.Set([]byte(key), byteValue)
|
||||
if err != nil {
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to set (%s:%s) for MultiSaveAndRemoveWithPrefix()", key, value))
|
||||
return loggingErr
|
||||
@ -581,8 +581,8 @@ func (kv *txnTiKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]b
|
||||
start := time.Now()
|
||||
prefix = path.Join(kv.rootPath, prefix)
|
||||
|
||||
var logging_error error
|
||||
defer logWarnOnFailure(&logging_error, "txnTiKV WalkWithPagination error", zap.String("prefix", prefix))
|
||||
var loggingErr error
|
||||
defer logWarnOnFailure(&loggingErr, "txnTiKV WalkWithPagination error", zap.String("prefix", prefix))
|
||||
|
||||
// Since only reading, use Snapshot for less overhead
|
||||
ss := getSnapshot(kv.txn, paginationSize)
|
||||
@ -592,28 +592,28 @@ func (kv *txnTiKV) WalkWithPrefix(prefix string, paginationSize int, fn func([]b
|
||||
endKey := tikv.PrefixNextKey([]byte(prefix))
|
||||
iter, err := ss.Iter(startKey, endKey)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to create iterater for %s during WalkWithPrefix", prefix))
|
||||
return logging_error
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to create iterater for %s during WalkWithPrefix", prefix))
|
||||
return loggingErr
|
||||
}
|
||||
defer iter.Close()
|
||||
|
||||
// Iterate over the key-value pairs
|
||||
for iter.Valid() {
|
||||
// Grab value for empty check
|
||||
byte_val := iter.Value()
|
||||
byteVal := iter.Value()
|
||||
// Check if empty val and replace with placeholder
|
||||
if isEmptyByte(byte_val) {
|
||||
byte_val = []byte{}
|
||||
if isEmptyByte(byteVal) {
|
||||
byteVal = []byte{}
|
||||
}
|
||||
err = fn(iter.Key(), byte_val)
|
||||
err = fn(iter.Key(), byteVal)
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to apply fn to (%s;%s)", string(iter.Key()), string(byte_val)))
|
||||
return logging_error
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to apply fn to (%s;%s)", string(iter.Key()), string(byteVal)))
|
||||
return loggingErr
|
||||
}
|
||||
err = iter.Next()
|
||||
if err != nil {
|
||||
logging_error = errors.Wrap(err, fmt.Sprintf("Failed to move Iterator after key %s for WalkWithPrefix", string(iter.Key())))
|
||||
return logging_error
|
||||
loggingErr = errors.Wrap(err, fmt.Sprintf("Failed to move Iterator after key %s for WalkWithPrefix", string(iter.Key())))
|
||||
return loggingErr
|
||||
}
|
||||
}
|
||||
CheckElapseAndWarn(start, "Slow txnTiKV WalkWithPagination() operation", zap.String("prefix", prefix))
|
||||
@ -658,7 +658,7 @@ func (kv *txnTiKV) getTiKVMeta(ctx context.Context, key string) (string, error)
|
||||
}
|
||||
|
||||
// Check if value is the empty placeholder
|
||||
str_val := convertEmptyByteToString(val)
|
||||
strVal := convertEmptyByteToString(val)
|
||||
|
||||
elapsed := start.ElapseSpan()
|
||||
|
||||
@ -667,7 +667,7 @@ func (kv *txnTiKV) getTiKVMeta(ctx context.Context, key string) (string, error)
|
||||
metrics.MetaRequestLatency.WithLabelValues(metrics.MetaGetLabel).Observe(float64(elapsed.Milliseconds()))
|
||||
metrics.MetaOpCounter.WithLabelValues(metrics.MetaGetLabel, metrics.SuccessLabel).Inc()
|
||||
|
||||
return str_val, nil
|
||||
return strVal, nil
|
||||
}
|
||||
|
||||
func (kv *txnTiKV) putTiKVMeta(ctx context.Context, key, val string) error {
|
||||
@ -683,12 +683,12 @@ func (kv *txnTiKV) putTiKVMeta(ctx context.Context, key, val string) error {
|
||||
// Defer a rollback only if the transaction hasn't been committed
|
||||
defer rollbackOnFailure(&err, txn)
|
||||
|
||||
// Check if the value being written needs to be empty plaeholder
|
||||
byte_value, err := convertEmptyStringToByte(val)
|
||||
// Check if the value being written needs to be empty placeholder
|
||||
byteValue, err := convertEmptyStringToByte(val)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, fmt.Sprintf("Failed to cast to byte (%s:%s) for putTiKVMeta", key, val))
|
||||
}
|
||||
err = txn.Set([]byte(key), byte_value)
|
||||
err = txn.Set([]byte(key), byteValue)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, fmt.Sprintf("Failed to set value for key %s in putTiKVMeta", key))
|
||||
}
|
||||
@ -697,7 +697,7 @@ func (kv *txnTiKV) putTiKVMeta(ctx context.Context, key, val string) error {
|
||||
elapsed := start.ElapseSpan()
|
||||
metrics.MetaOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.TotalLabel).Inc()
|
||||
if err == nil {
|
||||
metrics.MetaKvSize.WithLabelValues(metrics.MetaPutLabel).Observe(float64(len(byte_value)))
|
||||
metrics.MetaKvSize.WithLabelValues(metrics.MetaPutLabel).Observe(float64(len(byteValue)))
|
||||
metrics.MetaRequestLatency.WithLabelValues(metrics.MetaPutLabel).Observe(float64(elapsed.Milliseconds()))
|
||||
metrics.MetaOpCounter.WithLabelValues(metrics.MetaPutLabel, metrics.SuccessLabel).Inc()
|
||||
} else {
|
||||
|
@ -561,7 +561,7 @@ func TestEmptyKey(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestScanSize(t *testing.T) {
|
||||
scan_size := SnapshotScanSize
|
||||
scanSize := SnapshotScanSize
|
||||
kv := NewTiKV(txnClient, "/")
|
||||
err := kv.RemoveWithPrefix("")
|
||||
require.NoError(t, err)
|
||||
@ -570,18 +570,18 @@ func TestScanSize(t *testing.T) {
|
||||
defer kv.RemoveWithPrefix("")
|
||||
|
||||
// Test total > scansize
|
||||
key_map := map[string]string{}
|
||||
for i := 1; i <= scan_size+100; i++ {
|
||||
keyMap := map[string]string{}
|
||||
for i := 1; i <= scanSize+100; i++ {
|
||||
a := fmt.Sprintf("%v", i)
|
||||
key_map[a] = a
|
||||
keyMap[a] = a
|
||||
}
|
||||
|
||||
err = kv.MultiSave(key_map)
|
||||
err = kv.MultiSave(keyMap)
|
||||
assert.NoError(t, err)
|
||||
|
||||
keys, _, err := kv.LoadWithPrefix("")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, len(keys), scan_size+100)
|
||||
assert.Equal(t, len(keys), scanSize+100)
|
||||
|
||||
err = kv.RemoveWithPrefix("")
|
||||
require.NoError(t, err)
|
||||
|
@ -31,14 +31,16 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
|
||||
var checkRoundTaskNumLimit = 256
|
||||
const (
|
||||
segmentChecker = "segment_checker"
|
||||
channelChecker = "channel_checker"
|
||||
balanceChecker = "balance_checker"
|
||||
indexChecker = "index_checker"
|
||||
)
|
||||
|
||||
var (
|
||||
Segment_Checker = "segment_checker"
|
||||
Channel_Checker = "channel_checker"
|
||||
Balance_Checker = "balance_checker"
|
||||
Index_Checker = "index_checker"
|
||||
CheckerOrder = []string{Channel_Checker, Segment_Checker, Balance_Checker, Index_Checker}
|
||||
checkRoundTaskNumLimit = 256
|
||||
checkerOrder = []string{channelChecker, segmentChecker, balanceChecker, indexChecker}
|
||||
)
|
||||
|
||||
type CheckerController struct {
|
||||
@ -69,21 +71,21 @@ func NewCheckerController(
|
||||
// CheckerController runs checkers with the order,
|
||||
// the former checker has higher priority
|
||||
checkers := map[string]Checker{
|
||||
Channel_Checker: NewChannelChecker(meta, dist, targetMgr, balancer),
|
||||
Segment_Checker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr),
|
||||
Balance_Checker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler),
|
||||
Index_Checker: NewIndexChecker(meta, dist, broker),
|
||||
channelChecker: NewChannelChecker(meta, dist, targetMgr, balancer),
|
||||
segmentChecker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr),
|
||||
balanceChecker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler),
|
||||
indexChecker: NewIndexChecker(meta, dist, broker),
|
||||
}
|
||||
|
||||
id := 0
|
||||
for _, checkerName := range CheckerOrder {
|
||||
for _, checkerName := range checkerOrder {
|
||||
checkers[checkerName].SetID(int64(id + 1))
|
||||
}
|
||||
|
||||
manualCheckChs := map[string]chan struct{}{
|
||||
Channel_Checker: make(chan struct{}, 1),
|
||||
Segment_Checker: make(chan struct{}, 1),
|
||||
Balance_Checker: make(chan struct{}, 1),
|
||||
channelChecker: make(chan struct{}, 1),
|
||||
segmentChecker: make(chan struct{}, 1),
|
||||
balanceChecker: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
return &CheckerController{
|
||||
@ -108,13 +110,13 @@ func (controller *CheckerController) Start() {
|
||||
|
||||
func getCheckerInterval(checkerType string) time.Duration {
|
||||
switch checkerType {
|
||||
case Segment_Checker:
|
||||
case segmentChecker:
|
||||
return Params.QueryCoordCfg.SegmentCheckInterval.GetAsDuration(time.Millisecond)
|
||||
case Channel_Checker:
|
||||
case channelChecker:
|
||||
return Params.QueryCoordCfg.ChannelCheckInterval.GetAsDuration(time.Millisecond)
|
||||
case Balance_Checker:
|
||||
case balanceChecker:
|
||||
return Params.QueryCoordCfg.BalanceCheckInterval.GetAsDuration(time.Millisecond)
|
||||
case Index_Checker:
|
||||
case indexChecker:
|
||||
return Params.QueryCoordCfg.IndexCheckInterval.GetAsDuration(time.Millisecond)
|
||||
default:
|
||||
return Params.QueryCoordCfg.CheckInterval.GetAsDuration(time.Millisecond)
|
||||
|
Loading…
Reference in New Issue
Block a user