From 9d3a21a9c935358adecf1e3b3410c41bbb12bf53 Mon Sep 17 00:00:00 2001 From: neza2017 Date: Mon, 16 Nov 2020 12:37:46 +0800 Subject: [PATCH] Rewrite NewTSOKVBase Signed-off-by: neza2017 --- cmd/master/main.go | 2 +- internal/master/collection_task_test.go | 2 +- internal/master/grpc_service_test.go | 2 +- internal/master/id/id.go | 4 +- internal/master/id/id_test.go | 9 +++- internal/master/master.go | 10 ++-- internal/master/partition_task_test.go | 2 +- internal/master/scheduler.go | 7 +-- internal/master/tso/global_allocator.go | 4 +- internal/master/tso/global_allocator_test.go | 8 +++- internal/proxy/proxy_test.go | 3 +- internal/reader/col_seg_container.go | 1 + internal/reader/flow_graph_insert_node.go | 14 ++++++ .../reader/flow_graph_service_time_node.go | 2 +- internal/reader/query_node.go | 29 +++++++++++- internal/reader/segment.go | 1 + internal/reader/stats_service.go | 47 +++++++++++++++---- internal/util/tsoutil/tso.go | 23 ++------- 18 files changed, 119 insertions(+), 51 deletions(-) diff --git a/cmd/master/main.go b/cmd/master/main.go index 6fa759b5c8..cfcce4979c 100644 --- a/cmd/master/main.go +++ b/cmd/master/main.go @@ -32,7 +32,7 @@ func main() { etcdPort, _ := gparams.GParams.Load("etcd.port") etcdAddr := etcdAddress + ":" + etcdPort etcdRootPath, _ := gparams.GParams.Load("etcd.rootpath") - svr, err := master.CreateServer(ctx, etcdRootPath, etcdRootPath, etcdRootPath, []string{etcdAddr}) + svr, err := master.CreateServer(ctx, etcdRootPath, etcdRootPath, []string{etcdAddr}) if err != nil { log.Print("create server failed", zap.Error(err)) } diff --git a/internal/master/collection_task_test.go b/internal/master/collection_task_test.go index ab5c1aa6eb..7ffd35e461 100644 --- a/internal/master/collection_task_test.go +++ b/internal/master/collection_task_test.go @@ -36,7 +36,7 @@ func TestMaster_CollectionTask(t *testing.T) { _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) assert.Nil(t, err) - svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr}) + svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr}) assert.Nil(t, err) err = svr.Run(10002) assert.Nil(t, err) diff --git a/internal/master/grpc_service_test.go b/internal/master/grpc_service_test.go index 3ef81843d4..ff564f8f22 100644 --- a/internal/master/grpc_service_test.go +++ b/internal/master/grpc_service_test.go @@ -34,7 +34,7 @@ func TestMaster_CreateCollection(t *testing.T) { _, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix()) assert.Nil(t, err) - svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr}) + svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr}) assert.Nil(t, err) err = svr.Run(10001) assert.Nil(t, err) diff --git a/internal/master/id/id.go b/internal/master/id/id.go index 931e4b38fc..c068f5b39f 100644 --- a/internal/master/id/id.go +++ b/internal/master/id/id.go @@ -16,8 +16,8 @@ type GlobalIDAllocator struct { var allocator *GlobalIDAllocator -func Init() { - InitGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase("gid")) +func Init(etcdAddr []string, rootPath string) { + InitGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(etcdAddr, rootPath, "gid")) } func InitGlobalIDAllocator(key string, base kvutil.Base) { diff --git a/internal/master/id/id_test.go b/internal/master/id/id_test.go index 8c31987191..6e60f684a5 100644 --- a/internal/master/id/id_test.go +++ b/internal/master/id/id_test.go @@ -17,7 +17,14 @@ func TestMain(m *testing.M) { if err != nil { panic(err) } - GIdAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase("gid")) + + etcdPort, err := gparams.GParams.Load("etcd.port") + if err != nil { + panic(err) + } + etcdAddr := "127.0.0.1:" + etcdPort + + GIdAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "gid")) exitCode := m.Run() os.Exit(exitCode) } diff --git a/internal/master/master.go b/internal/master/master.go index db13a78206..a2a3851059 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -72,15 +72,15 @@ func newKVBase(kvRoot string, etcdAddr []string) *kv.EtcdKV { return kvBase } -func Init() { +func Init(etcdAddr []string, rootPath string) { rand.Seed(time.Now().UnixNano()) - id.Init() - tso.Init() + id.Init(etcdAddr, rootPath) + tso.Init(etcdAddr, rootPath) } // CreateServer creates the UNINITIALIZED pd server with given configuration. -func CreateServer(ctx context.Context, kvRootPath string, metaRootPath, tsoRootPath string, etcdAddr []string) (*Master, error) { - Init() +func CreateServer(ctx context.Context, kvRootPath, metaRootPath string, etcdAddr []string) (*Master, error) { + Init(etcdAddr, kvRootPath) etcdClient, err := clientv3.New(clientv3.Config{Endpoints: etcdAddr}) if err != nil { diff --git a/internal/master/partition_task_test.go b/internal/master/partition_task_test.go index 066522b902..2b5431479b 100644 --- a/internal/master/partition_task_test.go +++ b/internal/master/partition_task_test.go @@ -38,7 +38,7 @@ func TestMaster_Partition(t *testing.T) { assert.Nil(t, err) port := 10000 + rand.Intn(1000) - svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr}) + svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr}) assert.Nil(t, err) err = svr.Run(int64(port)) assert.Nil(t, err) diff --git a/internal/master/scheduler.go b/internal/master/scheduler.go index 031c98801d..a71fb14bcb 100644 --- a/internal/master/scheduler.go +++ b/internal/master/scheduler.go @@ -1,6 +1,8 @@ package master -import "math/rand" +import ( + "github.com/zilliztech/milvus-distributed/internal/master/id" +) type ddRequestScheduler struct { reqQueue chan task @@ -21,7 +23,6 @@ func (rs *ddRequestScheduler) Enqueue(task task) error { return nil } -//TODO, allocGlobalID func allocGlobalID() (UniqueID, error) { - return rand.Int63(), nil + return id.AllocOne() } diff --git a/internal/master/tso/global_allocator.go b/internal/master/tso/global_allocator.go index 31aba37d78..d9bd6ceead 100644 --- a/internal/master/tso/global_allocator.go +++ b/internal/master/tso/global_allocator.go @@ -37,8 +37,8 @@ type GlobalTSOAllocator struct { var allocator *GlobalTSOAllocator -func Init() { - InitGlobalTsoAllocator("timestamp", tsoutil.NewTSOKVBase("tso")) +func Init(etcdAddr []string, rootPath string) { + InitGlobalTsoAllocator("timestamp", tsoutil.NewTSOKVBase(etcdAddr, rootPath, "tso")) } func InitGlobalTsoAllocator(key string, base kvutil.Base) { diff --git a/internal/master/tso/global_allocator_test.go b/internal/master/tso/global_allocator_test.go index 0d4d033eaa..70d318d471 100644 --- a/internal/master/tso/global_allocator_test.go +++ b/internal/master/tso/global_allocator_test.go @@ -18,7 +18,13 @@ func TestMain(m *testing.M) { if err != nil { panic(err) } - GTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase("tso")) + etcdPort, err := gparams.GParams.Load("etcd.port") + if err != nil { + panic(err) + } + etcdAddr := "127.0.0.1:" + etcdPort + + GTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "tso")) exitCode := m.Run() os.Exit(exitCode) diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 85906e73c6..0bbddc32ae 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -41,9 +41,8 @@ func startMaster(ctx context.Context) { rootPath := conf.Config.Etcd.Rootpath kvRootPath := path.Join(rootPath, "kv") metaRootPath := path.Join(rootPath, "meta") - tsoRootPath := path.Join(rootPath, "timestamp") - svr, err := master.CreateServer(ctx, kvRootPath, metaRootPath, tsoRootPath, []string{etcdAddr}) + svr, err := master.CreateServer(ctx, kvRootPath, metaRootPath, []string{etcdAddr}) masterServer = svr if err != nil { log.Print("create server failed", zap.Error(err)) diff --git a/internal/reader/col_seg_container.go b/internal/reader/col_seg_container.go index bb8a4993f4..bc94d4e866 100644 --- a/internal/reader/col_seg_container.go +++ b/internal/reader/col_seg_container.go @@ -206,6 +206,7 @@ func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSe } statisticData = append(statisticData, &stat) + segment.recentlyModified = false } return &internalpb.QueryNodeSegStats{ diff --git a/internal/reader/flow_graph_insert_node.go b/internal/reader/flow_graph_insert_node.go index bdafa53cf2..77500545e8 100644 --- a/internal/reader/flow_graph_insert_node.go +++ b/internal/reader/flow_graph_insert_node.go @@ -56,6 +56,20 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg { insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...) insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...) insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...) + + // check if segment exists, if not, create this segment + if !(*iNode.container).hasSegment(task.SegmentID) { + collection, err := (*iNode.container).getCollectionByName(task.CollectionName) + if err != nil { + log.Println(err) + continue + } + err = (*iNode.container).addSegment(task.SegmentID, task.PartitionTag, collection.ID()) + if err != nil { + log.Println(err) + continue + } + } } // 2. do preInsert diff --git a/internal/reader/flow_graph_service_time_node.go b/internal/reader/flow_graph_service_time_node.go index 246d8a32be..50ca674ff8 100644 --- a/internal/reader/flow_graph_service_time_node.go +++ b/internal/reader/flow_graph_service_time_node.go @@ -28,7 +28,7 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg { } // update service time - stNode.node.tSafe = serviceTimeMsg.timeRange.timestampMax + stNode.node.tSafe.setTSafe(serviceTimeMsg.timeRange.timestampMax) return nil } diff --git a/internal/reader/query_node.go b/internal/reader/query_node.go index 0a176853af..8ec94445d2 100644 --- a/internal/reader/query_node.go +++ b/internal/reader/query_node.go @@ -14,6 +14,7 @@ import "C" import ( "context" + "sync" ) type QueryNode struct { @@ -22,7 +23,7 @@ type QueryNode struct { QueryNodeID uint64 pulsarURL string - tSafe Timestamp + tSafe tSafe container *container @@ -32,6 +33,16 @@ type QueryNode struct { statsService *statsService } +type tSafe interface { + getTSafe() Timestamp + setTSafe(t Timestamp) +} + +type serviceTime struct { + tSafeMu sync.Mutex + time Timestamp +} + func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *QueryNode { segmentsMap := make(map[int64]*Segment) collections := make([]*Collection, 0) @@ -41,13 +52,15 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *Qu segments: segmentsMap, } + var tSafe tSafe = &serviceTime{} + return &QueryNode{ ctx: ctx, QueryNodeID: queryNodeID, pulsarURL: pulsarURL, - tSafe: 0, + tSafe: tSafe, container: &container, @@ -73,3 +86,15 @@ func (node *QueryNode) Start() { func (node *QueryNode) Close() { // TODO: close services } + +func (st *serviceTime) getTSafe() Timestamp { + st.tSafeMu.Lock() + defer st.tSafeMu.Unlock() + return st.time +} + +func (st *serviceTime) setTSafe(t Timestamp) { + st.tSafeMu.Lock() + st.time = t + st.tSafeMu.Unlock() +} diff --git a/internal/reader/segment.go b/internal/reader/segment.go index 6aea60a552..f42fda8273 100644 --- a/internal/reader/segment.go +++ b/internal/reader/segment.go @@ -151,6 +151,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps return errors.New("Insert failed, error code = " + strconv.Itoa(int(status))) } + s.recentlyModified = true return nil } diff --git a/internal/reader/stats_service.go b/internal/reader/stats_service.go index 30812a3dda..0b8c0873ee 100644 --- a/internal/reader/stats_service.go +++ b/internal/reader/stats_service.go @@ -3,6 +3,7 @@ package reader import ( "context" "fmt" + "log" "strconv" "time" @@ -13,35 +14,55 @@ import ( type statsService struct { ctx context.Context - msgStream *msgstream.PulsarMsgStream + pulsarURL string + + msgStream *msgstream.MsgStream + container *container } -func newStatsService(ctx context.Context, container *container, pulsarAddress string) *statsService { - // TODO: add pulsar message stream init +func newStatsService(ctx context.Context, container *container, pulsarURL string) *statsService { return &statsService{ ctx: ctx, + pulsarURL: pulsarURL, + msgStream: nil, container: container, } } func (sService *statsService) start() { - sleepMillisecondTime := 1000 + const ( + receiveBufSize = 1024 + sleepMillisecondTime = 1000 + ) + + // start pulsar + producerChannels := []string{"statistic"} + + statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize) + statsStream.SetPulsarCient(sService.pulsarURL) + statsStream.CreatePulsarProducers(producerChannels) + + var statsMsgStream msgstream.MsgStream = statsStream + + sService.msgStream = &statsMsgStream + (*sService.msgStream).Start() + + // start service fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms") for { select { case <-sService.ctx.Done(): return - default: - time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond) + case <-time.After(sleepMillisecondTime * time.Millisecond): sService.sendSegmentStatistic() } } } func (sService *statsService) sendSegmentStatistic() { - var statisticData = (*sService.container).getSegmentStatistics() + statisticData := (*sService.container).getSegmentStatistics() // fmt.Println("Publish segment statistic") // fmt.Println(statisticData) @@ -49,5 +70,15 @@ func (sService *statsService) sendSegmentStatistic() { } func (sService *statsService) publicStatistic(statistic *internalpb.QueryNodeSegStats) { - // TODO: publish statistic + var msg msgstream.TsMsg = &msgstream.QueryNodeSegStatsMsg{ + QueryNodeSegStats: *statistic, + } + + var msgPack = msgstream.MsgPack{ + Msgs: []*msgstream.TsMsg{&msg}, + } + err := (*sService.msgStream).Produce(&msgPack) + if err != nil { + log.Println(err) + } } diff --git a/internal/util/tsoutil/tso.go b/internal/util/tsoutil/tso.go index 9e23dcdfe2..5e5a8e9fe3 100644 --- a/internal/util/tsoutil/tso.go +++ b/internal/util/tsoutil/tso.go @@ -1,12 +1,10 @@ package tsoutil import ( - "fmt" "path" "time" "github.com/zilliztech/milvus-distributed/internal/kv" - gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil" "go.etcd.io/etcd/clientv3" ) @@ -27,25 +25,10 @@ func ParseTS(ts uint64) (time.Time, uint64) { return physicalTime, logical } -func NewTSOKVBase(subPath string) *kv.EtcdKV { - etcdAddr, err := gparams.GParams.Load("etcd.address") - if err != nil { - panic(err) - } - etcdPort, err := gparams.GParams.Load("etcd.port") - if err != nil { - panic(err) - } - etcdAddr = etcdAddr + ":" + etcdPort - fmt.Println("etcdAddr ::: ", etcdAddr) +func NewTSOKVBase(etcdAddr []string, tsoRoot, subPath string) *kv.EtcdKV { client, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{etcdAddr}, + Endpoints: etcdAddr, DialTimeout: 5 * time.Second, }) - - etcdRootPath, err := gparams.GParams.Load("etcd.rootpath") - if err != nil { - panic(err) - } - return kv.NewEtcdKV(client, path.Join(etcdRootPath, subPath)) + return kv.NewEtcdKV(client, path.Join(tsoRoot, subPath)) }