milvus/cmd/tools/datameta/main.go

122 lines
3.9 KiB
Go
Raw Normal View History

package main
import (
"flag"
"fmt"
"sort"
"strings"
"github.com/golang/protobuf/proto"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/zap"
)
var (
etcdAddr = flag.String("etcd", "127.0.0.1:2379", "Etcd Endpoint to connect")
rootPath = flag.String("rootPath", "by-dev/meta/datacoord-meta/s", "Datacoord Segment root path to iterate")
collectionID = flag.Int64("collection", 0, "Collection ID to filter with")
partitionID = flag.Int64("partition", 0, "Partition ID to filter with")
segmentID = flag.Int64("segment", 0, "Segment ID to filter with")
channel = flag.String("channel", "", "Channel name to filter with")
detailBinlogs = flag.Bool("detail", false, "Display detail binlog path content")
)
func main() {
flag.Parse()
etcdkv, err := etcdkv.NewEtcdKV([]string{*etcdAddr}, *rootPath)
if err != nil {
log.Fatal("failed to connect to ected", zap.Error(err))
}
keys, values, err := etcdkv.LoadWithPrefix("/")
if err != nil {
log.Fatal("failed to list ", zap.Error(err))
}
for i := range keys {
info := &datapb.SegmentInfo{}
err = proto.Unmarshal([]byte(values[i]), info)
if err != nil {
continue
}
if *collectionID > 0 && info.CollectionID != *collectionID {
continue
}
if *partitionID > 0 && info.PartitionID != *partitionID {
continue
}
if *segmentID > 0 && info.ID != *segmentID {
continue
}
if len(*channel) > 0 && !strings.Contains(info.InsertChannel, *channel) {
continue
}
printSegmentInfo(info)
}
}
const (
tsPrintFormat = "2006-01-02 15:04:05.999 -0700"
)
func printSegmentInfo(info *datapb.SegmentInfo) {
fmt.Println("================================================================================")
fmt.Printf("Segment ID: %d\n", info.ID)
fmt.Printf("Segment State:%v\n", info.State)
fmt.Printf("Collection ID: %d\t\tPartitionID: %d\n", info.CollectionID, info.PartitionID)
fmt.Printf("Insert Channel:%s\n", info.InsertChannel)
fmt.Printf("Num of Rows: %d\t\tMax Row Num: %d\n", info.NumOfRows, info.MaxRowNum)
lastExpireTime, _ := tsoutil.ParseTS(info.LastExpireTime)
fmt.Printf("Last Expire Time: %s\n", lastExpireTime.Format(tsPrintFormat))
if info.StartPosition != nil {
startTime, _ := tsoutil.ParseTS(info.StartPosition.Timestamp)
fmt.Printf("Start Position ID: %v, time: %s\n", info.StartPosition.MsgID, startTime.Format(tsPrintFormat))
} else {
fmt.Println("Start Position: nil")
}
if info.DmlPosition != nil {
dmlTime, _ := tsoutil.ParseTS(info.DmlPosition.Timestamp)
fmt.Printf("Dml Position ID: %v, time: %s\n", info.StartPosition.MsgID, dmlTime.Format(tsPrintFormat))
} else {
fmt.Println("Dml Position: nil")
}
fmt.Printf("Binlog Nums %d\tStatsLog Nums: %d\tDeltaLog Nums:%d\n",
len(info.Binlogs), len(info.Statslogs), len(info.Deltalogs))
if *detailBinlogs {
fmt.Println("**************************************")
fmt.Println("Binlogs:")
sort.Slice(info.Binlogs, func(i, j int) bool {
return info.Binlogs[i].FieldID < info.Binlogs[j].FieldID
})
for _, log := range info.Binlogs {
fmt.Printf("Field %d: %v\n", log.FieldID, log.Binlogs)
}
fmt.Println("**************************************")
fmt.Println("Statslogs:")
sort.Slice(info.Statslogs, func(i, j int) bool {
return info.Statslogs[i].FieldID < info.Statslogs[j].FieldID
})
for _, log := range info.Statslogs {
fmt.Printf("Field %d: %v\n", log.FieldID, log.Binlogs)
}
fmt.Println("**************************************")
fmt.Println("Delta Logs:")
for _, log := range info.GetDeltalogs() {
fmt.Printf("Entries: %d From: %v - To: %v\n", log.RecordEntries, log.TimestampFrom, log.TimestampTo)
fmt.Printf("Path: %v\n", log.DeltaLogPath)
}
}
fmt.Println("================================================================================")
}