Rewrite NewTSOKVBase

Signed-off-by: neza2017 <yefu.chen@zilliz.com>
This commit is contained in:
neza2017 2020-11-16 12:37:46 +08:00 committed by yefu.chen
parent d59f6ac6ca
commit 9d3a21a9c9
18 changed files with 119 additions and 51 deletions

View File

@ -32,7 +32,7 @@ func main() {
etcdPort, _ := gparams.GParams.Load("etcd.port")
etcdAddr := etcdAddress + ":" + etcdPort
etcdRootPath, _ := gparams.GParams.Load("etcd.rootpath")
svr, err := master.CreateServer(ctx, etcdRootPath, etcdRootPath, etcdRootPath, []string{etcdAddr})
svr, err := master.CreateServer(ctx, etcdRootPath, etcdRootPath, []string{etcdAddr})
if err != nil {
log.Print("create server failed", zap.Error(err))
}

View File

@ -36,7 +36,7 @@ func TestMaster_CollectionTask(t *testing.T) {
_, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr})
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr})
assert.Nil(t, err)
err = svr.Run(10002)
assert.Nil(t, err)

View File

@ -34,7 +34,7 @@ func TestMaster_CreateCollection(t *testing.T) {
_, err = etcdCli.Delete(ctx, "/test/root", clientv3.WithPrefix())
assert.Nil(t, err)
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr})
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr})
assert.Nil(t, err)
err = svr.Run(10001)
assert.Nil(t, err)

View File

@ -16,8 +16,8 @@ type GlobalIDAllocator struct {
var allocator *GlobalIDAllocator
func Init() {
InitGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase("gid"))
func Init(etcdAddr []string, rootPath string) {
InitGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase(etcdAddr, rootPath, "gid"))
}
func InitGlobalIDAllocator(key string, base kvutil.Base) {

View File

@ -17,7 +17,14 @@ func TestMain(m *testing.M) {
if err != nil {
panic(err)
}
GIdAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase("gid"))
etcdPort, err := gparams.GParams.Load("etcd.port")
if err != nil {
panic(err)
}
etcdAddr := "127.0.0.1:" + etcdPort
GIdAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "gid"))
exitCode := m.Run()
os.Exit(exitCode)
}

View File

@ -72,15 +72,15 @@ func newKVBase(kvRoot string, etcdAddr []string) *kv.EtcdKV {
return kvBase
}
func Init() {
func Init(etcdAddr []string, rootPath string) {
rand.Seed(time.Now().UnixNano())
id.Init()
tso.Init()
id.Init(etcdAddr, rootPath)
tso.Init(etcdAddr, rootPath)
}
// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, kvRootPath string, metaRootPath, tsoRootPath string, etcdAddr []string) (*Master, error) {
Init()
func CreateServer(ctx context.Context, kvRootPath, metaRootPath string, etcdAddr []string) (*Master, error) {
Init(etcdAddr, kvRootPath)
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: etcdAddr})
if err != nil {

View File

@ -38,7 +38,7 @@ func TestMaster_Partition(t *testing.T) {
assert.Nil(t, err)
port := 10000 + rand.Intn(1000)
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", "/test/root/meta/tso", []string{etcdAddr})
svr, err := CreateServer(ctx, "/test/root/kv", "/test/root/meta", []string{etcdAddr})
assert.Nil(t, err)
err = svr.Run(int64(port))
assert.Nil(t, err)

View File

@ -1,6 +1,8 @@
package master
import "math/rand"
import (
"github.com/zilliztech/milvus-distributed/internal/master/id"
)
type ddRequestScheduler struct {
reqQueue chan task
@ -21,7 +23,6 @@ func (rs *ddRequestScheduler) Enqueue(task task) error {
return nil
}
//TODO, allocGlobalID
func allocGlobalID() (UniqueID, error) {
return rand.Int63(), nil
return id.AllocOne()
}

View File

@ -37,8 +37,8 @@ type GlobalTSOAllocator struct {
var allocator *GlobalTSOAllocator
func Init() {
InitGlobalTsoAllocator("timestamp", tsoutil.NewTSOKVBase("tso"))
func Init(etcdAddr []string, rootPath string) {
InitGlobalTsoAllocator("timestamp", tsoutil.NewTSOKVBase(etcdAddr, rootPath, "tso"))
}
func InitGlobalTsoAllocator(key string, base kvutil.Base) {

View File

@ -18,7 +18,13 @@ func TestMain(m *testing.M) {
if err != nil {
panic(err)
}
GTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase("tso"))
etcdPort, err := gparams.GParams.Load("etcd.port")
if err != nil {
panic(err)
}
etcdAddr := "127.0.0.1:" + etcdPort
GTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddr}, "/test/root/kv", "tso"))
exitCode := m.Run()
os.Exit(exitCode)

View File

@ -41,9 +41,8 @@ func startMaster(ctx context.Context) {
rootPath := conf.Config.Etcd.Rootpath
kvRootPath := path.Join(rootPath, "kv")
metaRootPath := path.Join(rootPath, "meta")
tsoRootPath := path.Join(rootPath, "timestamp")
svr, err := master.CreateServer(ctx, kvRootPath, metaRootPath, tsoRootPath, []string{etcdAddr})
svr, err := master.CreateServer(ctx, kvRootPath, metaRootPath, []string{etcdAddr})
masterServer = svr
if err != nil {
log.Print("create server failed", zap.Error(err))

View File

@ -206,6 +206,7 @@ func (container *colSegContainer) getSegmentStatistics() *internalpb.QueryNodeSe
}
statisticData = append(statisticData, &stat)
segment.recentlyModified = false
}
return &internalpb.QueryNodeSegStats{

View File

@ -56,6 +56,20 @@ func (iNode *insertNode) Operate(in []*Msg) []*Msg {
insertData.insertIDs[task.SegmentID] = append(insertData.insertIDs[task.SegmentID], task.RowIDs...)
insertData.insertTimestamps[task.SegmentID] = append(insertData.insertTimestamps[task.SegmentID], task.Timestamps...)
insertData.insertRecords[task.SegmentID] = append(insertData.insertRecords[task.SegmentID], task.RowData...)
// check if segment exists, if not, create this segment
if !(*iNode.container).hasSegment(task.SegmentID) {
collection, err := (*iNode.container).getCollectionByName(task.CollectionName)
if err != nil {
log.Println(err)
continue
}
err = (*iNode.container).addSegment(task.SegmentID, task.PartitionTag, collection.ID())
if err != nil {
log.Println(err)
continue
}
}
}
// 2. do preInsert

View File

@ -28,7 +28,7 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
}
// update service time
stNode.node.tSafe = serviceTimeMsg.timeRange.timestampMax
stNode.node.tSafe.setTSafe(serviceTimeMsg.timeRange.timestampMax)
return nil
}

View File

@ -14,6 +14,7 @@ import "C"
import (
"context"
"sync"
)
type QueryNode struct {
@ -22,7 +23,7 @@ type QueryNode struct {
QueryNodeID uint64
pulsarURL string
tSafe Timestamp
tSafe tSafe
container *container
@ -32,6 +33,16 @@ type QueryNode struct {
statsService *statsService
}
type tSafe interface {
getTSafe() Timestamp
setTSafe(t Timestamp)
}
type serviceTime struct {
tSafeMu sync.Mutex
time Timestamp
}
func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *QueryNode {
segmentsMap := make(map[int64]*Segment)
collections := make([]*Collection, 0)
@ -41,13 +52,15 @@ func NewQueryNode(ctx context.Context, queryNodeID uint64, pulsarURL string) *Qu
segments: segmentsMap,
}
var tSafe tSafe = &serviceTime{}
return &QueryNode{
ctx: ctx,
QueryNodeID: queryNodeID,
pulsarURL: pulsarURL,
tSafe: 0,
tSafe: tSafe,
container: &container,
@ -73,3 +86,15 @@ func (node *QueryNode) Start() {
func (node *QueryNode) Close() {
// TODO: close services
}
func (st *serviceTime) getTSafe() Timestamp {
st.tSafeMu.Lock()
defer st.tSafeMu.Unlock()
return st.time
}
func (st *serviceTime) setTSafe(t Timestamp) {
st.tSafeMu.Lock()
st.time = t
st.tSafeMu.Unlock()
}

View File

@ -151,6 +151,7 @@ func (s *Segment) segmentInsert(offset int64, entityIDs *[]UniqueID, timestamps
return errors.New("Insert failed, error code = " + strconv.Itoa(int(status)))
}
s.recentlyModified = true
return nil
}

View File

@ -3,6 +3,7 @@ package reader
import (
"context"
"fmt"
"log"
"strconv"
"time"
@ -13,35 +14,55 @@ import (
type statsService struct {
ctx context.Context
msgStream *msgstream.PulsarMsgStream
pulsarURL string
msgStream *msgstream.MsgStream
container *container
}
func newStatsService(ctx context.Context, container *container, pulsarAddress string) *statsService {
// TODO: add pulsar message stream init
func newStatsService(ctx context.Context, container *container, pulsarURL string) *statsService {
return &statsService{
ctx: ctx,
pulsarURL: pulsarURL,
msgStream: nil,
container: container,
}
}
func (sService *statsService) start() {
sleepMillisecondTime := 1000
const (
receiveBufSize = 1024
sleepMillisecondTime = 1000
)
// start pulsar
producerChannels := []string{"statistic"}
statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize)
statsStream.SetPulsarCient(sService.pulsarURL)
statsStream.CreatePulsarProducers(producerChannels)
var statsMsgStream msgstream.MsgStream = statsStream
sService.msgStream = &statsMsgStream
(*sService.msgStream).Start()
// start service
fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms")
for {
select {
case <-sService.ctx.Done():
return
default:
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
case <-time.After(sleepMillisecondTime * time.Millisecond):
sService.sendSegmentStatistic()
}
}
}
func (sService *statsService) sendSegmentStatistic() {
var statisticData = (*sService.container).getSegmentStatistics()
statisticData := (*sService.container).getSegmentStatistics()
// fmt.Println("Publish segment statistic")
// fmt.Println(statisticData)
@ -49,5 +70,15 @@ func (sService *statsService) sendSegmentStatistic() {
}
func (sService *statsService) publicStatistic(statistic *internalpb.QueryNodeSegStats) {
// TODO: publish statistic
var msg msgstream.TsMsg = &msgstream.QueryNodeSegStatsMsg{
QueryNodeSegStats: *statistic,
}
var msgPack = msgstream.MsgPack{
Msgs: []*msgstream.TsMsg{&msg},
}
err := (*sService.msgStream).Produce(&msgPack)
if err != nil {
log.Println(err)
}
}

View File

@ -1,12 +1,10 @@
package tsoutil
import (
"fmt"
"path"
"time"
"github.com/zilliztech/milvus-distributed/internal/kv"
gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil"
"go.etcd.io/etcd/clientv3"
)
@ -27,25 +25,10 @@ func ParseTS(ts uint64) (time.Time, uint64) {
return physicalTime, logical
}
func NewTSOKVBase(subPath string) *kv.EtcdKV {
etcdAddr, err := gparams.GParams.Load("etcd.address")
if err != nil {
panic(err)
}
etcdPort, err := gparams.GParams.Load("etcd.port")
if err != nil {
panic(err)
}
etcdAddr = etcdAddr + ":" + etcdPort
fmt.Println("etcdAddr ::: ", etcdAddr)
func NewTSOKVBase(etcdAddr []string, tsoRoot, subPath string) *kv.EtcdKV {
client, _ := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
Endpoints: etcdAddr,
DialTimeout: 5 * time.Second,
})
etcdRootPath, err := gparams.GParams.Load("etcd.rootpath")
if err != nil {
panic(err)
}
return kv.NewEtcdKV(client, path.Join(etcdRootPath, subPath))
return kv.NewEtcdKV(client, path.Join(tsoRoot, subPath))
}