mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Add log for the large value during etcd saving (#18229)
Signed-off-by: yun.zhang <yun.zhang@zilliz.com> Signed-off-by: yun.zhang <yun.zhang@zilliz.com>
This commit is contained in:
parent
00c0480784
commit
237d912625
@ -18,6 +18,7 @@ package etcdkv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"path"
|
||||
"time"
|
||||
@ -322,6 +323,7 @@ func (kv *EtcdKV) Save(key, value string) error {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
CheckValueSizeAndWarn(key, value)
|
||||
_, err := kv.client.Put(ctx, key, value)
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save", zap.String("key", key))
|
||||
return err
|
||||
@ -333,6 +335,7 @@ func (kv *EtcdKV) SaveBytes(key string, value []byte) error {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
CheckValueSizeAndWarn(key, value)
|
||||
_, err := kv.client.Put(ctx, key, string(value))
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save", zap.String("key", key))
|
||||
return err
|
||||
@ -345,6 +348,7 @@ func (kv *EtcdKV) SaveWithLease(key, value string, id clientv3.LeaseID) error {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
CheckValueSizeAndWarn(key, value)
|
||||
_, err := kv.client.Put(ctx, key, value, clientv3.WithLease(id))
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save with lease", zap.String("key", key))
|
||||
return err
|
||||
@ -357,6 +361,7 @@ func (kv *EtcdKV) SaveWithIgnoreLease(key, value string) error {
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
CheckValueSizeAndWarn(key, value)
|
||||
_, err := kv.client.Put(ctx, key, value, clientv3.WithIgnoreLease())
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save with lease", zap.String("key", key))
|
||||
return err
|
||||
@ -368,6 +373,7 @@ func (kv *EtcdKV) SaveBytesWithLease(key string, value []byte, id clientv3.Lease
|
||||
key = path.Join(kv.rootPath, key)
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
CheckValueSizeAndWarn(key, value)
|
||||
_, err := kv.client.Put(ctx, key, string(value), clientv3.WithLease(id))
|
||||
CheckElapseAndWarn(start, "Slow etcd operation save with lease", zap.String("key", key))
|
||||
return err
|
||||
@ -386,6 +392,7 @@ func (kv *EtcdKV) MultiSave(kvs map[string]string) error {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
CheckTnxStringValueSizeAndWarn(kvs)
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save", zap.Strings("keys", keys))
|
||||
return err
|
||||
@ -404,6 +411,7 @@ func (kv *EtcdKV) MultiSaveBytes(kvs map[string][]byte) error {
|
||||
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
CheckTnxBytesValueSizeAndWarn(kvs)
|
||||
_, err := kv.client.Txn(ctx).If().Then(ops...).Commit()
|
||||
CheckElapseAndWarn(start, "Slow etcd operation multi save", zap.Strings("keys", keys))
|
||||
return err
|
||||
@ -685,3 +693,31 @@ func CheckElapseAndWarn(start time.Time, message string, fields ...zap.Field) bo
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func CheckValueSizeAndWarn(key string, value interface{}) bool {
|
||||
size := binary.Size(value)
|
||||
if size > 102400 {
|
||||
log.Warn("value size large than 100kb", zap.String("key", key), zap.Int("value_size(kb)", size/1024))
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func CheckTnxBytesValueSizeAndWarn(kvs map[string][]byte) bool {
|
||||
var hasWarn bool
|
||||
for key, value := range kvs {
|
||||
if CheckValueSizeAndWarn(key, value) {
|
||||
hasWarn = true
|
||||
}
|
||||
}
|
||||
return hasWarn
|
||||
}
|
||||
|
||||
func CheckTnxStringValueSizeAndWarn(kvs map[string]string) bool {
|
||||
newKvs := make(map[string][]byte, len(kvs))
|
||||
for key, value := range kvs {
|
||||
newKvs[key] = []byte(value)
|
||||
}
|
||||
|
||||
return CheckTnxBytesValueSizeAndWarn(newKvs)
|
||||
}
|
||||
|
@ -909,3 +909,38 @@ func TestElapse(t *testing.T) {
|
||||
isElapse = etcdkv.CheckElapseAndWarn(start, "err message")
|
||||
assert.Equal(t, isElapse, true)
|
||||
}
|
||||
|
||||
func TestCheckValueSizeAndWarn(t *testing.T) {
|
||||
ret := etcdkv.CheckValueSizeAndWarn("k", "v")
|
||||
assert.False(t, ret)
|
||||
|
||||
v := make([]byte, 1024000)
|
||||
ret = etcdkv.CheckValueSizeAndWarn("k", v)
|
||||
assert.True(t, ret)
|
||||
}
|
||||
|
||||
func TestCheckTnxBytesValueSizeAndWarn(t *testing.T) {
|
||||
kvs := make(map[string][]byte, 0)
|
||||
kvs["k"] = []byte("v")
|
||||
ret := etcdkv.CheckTnxBytesValueSizeAndWarn(kvs)
|
||||
assert.False(t, ret)
|
||||
|
||||
kvs["k"] = make([]byte, 1024000)
|
||||
ret = etcdkv.CheckTnxBytesValueSizeAndWarn(kvs)
|
||||
assert.True(t, ret)
|
||||
}
|
||||
|
||||
func TestCheckTnxStringValueSizeAndWarn(t *testing.T) {
|
||||
kvs := make(map[string]string, 0)
|
||||
kvs["k"] = "v"
|
||||
ret := etcdkv.CheckTnxStringValueSizeAndWarn(kvs)
|
||||
assert.False(t, ret)
|
||||
|
||||
bytes := make([]byte, 1024000)
|
||||
for i := 0; i < 1024000; i++ {
|
||||
bytes[i] = byte('a')
|
||||
}
|
||||
kvs["k"] = string(bytes)
|
||||
ret = etcdkv.CheckTnxStringValueSizeAndWarn(kvs)
|
||||
assert.True(t, ret)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user