mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 18:38:44 +08:00
Add proxy timetick
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
parent
a2f2284932
commit
3b5765ae47
@ -7,7 +7,7 @@ import (
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/master/mock"
|
||||
mockmaster "github.com/zilliztech/milvus-distributed/internal/master/mock"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -1,14 +1,13 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"context"
|
||||
"flag"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/conf"
|
||||
|
@ -4,15 +4,17 @@ import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"syscall"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/conf"
|
||||
"github.com/zilliztech/milvus-distributed/internal/reader"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx, _ := context.WithCancel(context.Background())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
var yamlFile string
|
||||
flag.StringVar(&yamlFile, "yaml", "", "yaml file")
|
||||
flag.Parse()
|
||||
@ -20,9 +22,33 @@ func main() {
|
||||
fmt.Println("yaml file: ", yamlFile)
|
||||
conf.LoadConfig(yamlFile)
|
||||
|
||||
sc := make(chan os.Signal, 1)
|
||||
signal.Notify(sc,
|
||||
syscall.SIGHUP,
|
||||
syscall.SIGINT,
|
||||
syscall.SIGTERM,
|
||||
syscall.SIGQUIT)
|
||||
|
||||
var sig os.Signal
|
||||
go func() {
|
||||
sig = <-sc
|
||||
cancel()
|
||||
}()
|
||||
|
||||
pulsarAddr := "pulsar://"
|
||||
pulsarAddr += conf.Config.Pulsar.Address
|
||||
pulsarAddr += ":"
|
||||
pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
|
||||
reader.StartQueryNode(ctx, pulsarAddr)
|
||||
|
||||
switch sig {
|
||||
case syscall.SIGTERM:
|
||||
exit(0)
|
||||
default:
|
||||
exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func exit(code int) {
|
||||
os.Exit(code)
|
||||
}
|
||||
|
@ -104,7 +104,7 @@ func runBatchGet() {
|
||||
start := end - int32(batchOpSize)
|
||||
keys := totalKeys[start:end]
|
||||
versions := make([]uint64, batchOpSize)
|
||||
for i, _ := range versions {
|
||||
for i := range versions {
|
||||
versions[i] = uint64(numVersion)
|
||||
}
|
||||
atomic.AddInt32(&counter, 1)
|
||||
@ -152,7 +152,7 @@ func runBatchDelete() {
|
||||
keys := totalKeys[start:end]
|
||||
atomic.AddInt32(&counter, 1)
|
||||
versions := make([]uint64, batchOpSize)
|
||||
for i, _ := range versions {
|
||||
for i := range versions {
|
||||
versions[i] = uint64(numVersion)
|
||||
}
|
||||
err := store.DeleteRows(context.Background(), keys, versions)
|
||||
@ -199,8 +199,7 @@ func main() {
|
||||
|
||||
// Echo the parameters
|
||||
log.Printf("Benchmark log will write to file %s\n", logFile.Name())
|
||||
fmt.Fprint(logFile, fmt.Sprintf("Parameters: duration=%d, threads=%d, loops=%d, valueSize=%s, batchSize=%d, versions=%d\n", durationSecs, threads, loops, sizeArg, batchOpSize, numVersion))
|
||||
|
||||
fmt.Fprintf(logFile, "Parameters: duration=%d, threads=%d, loops=%d, valueSize=%s, batchSize=%d, versions=%d\n", durationSecs, threads, loops, sizeArg, batchOpSize, numVersion)
|
||||
// Init test data
|
||||
valueData = make([]byte, valueSize)
|
||||
rand.Read(valueData)
|
||||
@ -237,9 +236,8 @@ func main() {
|
||||
|
||||
setTime := setFinish.Sub(startTime).Seconds()
|
||||
bps := float64(uint64(counter)*valueSize*uint64(batchOpSize)) / setTime
|
||||
fmt.Fprint(logFile, fmt.Sprintf("Loop %d: BATCH PUT time %.1f secs, batchs = %d, kv pairs = %d, speed = %sB/sec, %.1f operations/sec, %.1f kv/sec.\n",
|
||||
loop, setTime, counter, counter*int32(batchOpSize), bytefmt.ByteSize(uint64(bps)), float64(counter)/setTime, float64(counter*int32(batchOpSize))/setTime))
|
||||
|
||||
fmt.Fprintf(logFile, "Loop %d: BATCH PUT time %.1f secs, batchs = %d, kv pairs = %d, speed = %sB/sec, %.1f operations/sec, %.1f kv/sec.\n",
|
||||
loop, setTime, counter, counter*int32(batchOpSize), bytefmt.ByteSize(uint64(bps)), float64(counter)/setTime, float64(counter*int32(batchOpSize))/setTime)
|
||||
// Record all test keys
|
||||
//totalKeyCount = keyNum
|
||||
//totalKeys = make([][]byte, totalKeyCount)
|
||||
|
@ -5,7 +5,7 @@ import (
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/protobuf/proto"
|
||||
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
|
@ -1,7 +1,7 @@
|
||||
package msgstream
|
||||
|
||||
import (
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/protobuf/proto"
|
||||
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
)
|
||||
|
||||
@ -62,6 +62,19 @@ func (it *InsertMsg) Unmarshal(input []byte) (*TsMsg, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, timestamp := range insertMsg.Timestamps {
|
||||
it.BeginTimestamp = timestamp
|
||||
it.EndTimestamp = timestamp
|
||||
break
|
||||
}
|
||||
for _, timestamp := range insertMsg.Timestamps {
|
||||
if timestamp > it.EndTimestamp {
|
||||
it.EndTimestamp = timestamp
|
||||
}
|
||||
if timestamp < it.BeginTimestamp {
|
||||
it.BeginTimestamp = timestamp
|
||||
}
|
||||
}
|
||||
var tsMsg TsMsg = insertMsg
|
||||
return &tsMsg, nil
|
||||
}
|
||||
@ -94,6 +107,19 @@ func (dt *DeleteMsg) Unmarshal(input []byte) (*TsMsg, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, timestamp := range deleteMsg.Timestamps {
|
||||
dt.BeginTimestamp = timestamp
|
||||
dt.EndTimestamp = timestamp
|
||||
break
|
||||
}
|
||||
for _, timestamp := range deleteMsg.Timestamps {
|
||||
if timestamp > dt.EndTimestamp {
|
||||
dt.EndTimestamp = timestamp
|
||||
}
|
||||
if timestamp < dt.BeginTimestamp {
|
||||
dt.BeginTimestamp = timestamp
|
||||
}
|
||||
}
|
||||
var tsMsg TsMsg = deleteMsg
|
||||
return &tsMsg, nil
|
||||
}
|
||||
@ -126,6 +152,8 @@ func (st *SearchMsg) Unmarshal(input []byte) (*TsMsg, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
st.BeginTimestamp = searchMsg.Timestamp
|
||||
st.EndTimestamp = searchMsg.Timestamp
|
||||
var tsMsg TsMsg = searchMsg
|
||||
return &tsMsg, nil
|
||||
}
|
||||
@ -158,6 +186,8 @@ func (srt *SearchResultMsg) Unmarshal(input []byte) (*TsMsg, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
srt.BeginTimestamp = searchResultMsg.Timestamp
|
||||
srt.EndTimestamp = searchResultMsg.Timestamp
|
||||
var tsMsg TsMsg = searchResultMsg
|
||||
return &tsMsg, nil
|
||||
}
|
||||
@ -190,6 +220,8 @@ func (tst *TimeTickMsg) Unmarshal(input []byte) (*TsMsg, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tst.BeginTimestamp = timeTick.Timestamp
|
||||
tst.EndTimestamp = timeTick.Timestamp
|
||||
var tsMsg TsMsg = timeTick
|
||||
return &tsMsg, nil
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/protobuf/proto"
|
||||
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
)
|
||||
|
||||
|
@ -5,7 +5,7 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/protobuf/proto"
|
||||
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
)
|
||||
|
||||
|
@ -2,32 +2,38 @@ package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
"google.golang.org/grpc"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type UniqueID = typeutil.UniqueID
|
||||
type Timestamp = typeutil.Timestamp
|
||||
|
||||
type Proxy struct {
|
||||
ctx context.Context
|
||||
proxyLoopCtx context.Context
|
||||
proxyLoopCancel func()
|
||||
proxyLoopWg sync.WaitGroup
|
||||
|
||||
grpcServer *grpc.Server
|
||||
masterConn *grpc.ClientConn
|
||||
masterClient masterpb.MasterClient
|
||||
taskSch *TaskScheduler
|
||||
grpcServer *grpc.Server
|
||||
masterConn *grpc.ClientConn
|
||||
masterClient masterpb.MasterClient
|
||||
taskSch *TaskScheduler
|
||||
tick *timeTick
|
||||
|
||||
idAllocator *allocator.IdAllocator
|
||||
tsoAllocator *allocator.TimestampAllocator
|
||||
|
||||
manipulationMsgStream *msgstream.PulsarMsgStream
|
||||
queryMsgStream *msgstream.PulsarMsgStream
|
||||
queryResultMsgStream *msgstream.PulsarMsgStream
|
||||
@ -39,23 +45,59 @@ type Proxy struct {
|
||||
|
||||
func CreateProxy(ctx context.Context) (*Proxy, error) {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
m := &Proxy{
|
||||
ctx: ctx,
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
p := &Proxy{
|
||||
proxyLoopCtx: ctx1,
|
||||
proxyLoopCancel: cancel,
|
||||
}
|
||||
return m, nil
|
||||
bufSize := int64(1000)
|
||||
p.manipulationMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
|
||||
p.queryMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
|
||||
p.queryResultMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
|
||||
|
||||
idAllocator, err := allocator.NewIdAllocator(p.proxyLoopCtx)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.idAllocator = idAllocator
|
||||
|
||||
tsoAllocator, err := allocator.NewTimestampAllocator(p.proxyLoopCtx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.tsoAllocator = tsoAllocator
|
||||
|
||||
p.taskSch, err = NewTaskScheduler(p.proxyLoopCtx, p.idAllocator, p.tsoAllocator)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// AddStartCallback adds a callback in the startServer phase.
|
||||
func (s *Proxy) AddStartCallback(callbacks ...func()) {
|
||||
s.startCallbacks = append(s.startCallbacks, callbacks...)
|
||||
func (p *Proxy) AddStartCallback(callbacks ...func()) {
|
||||
p.startCallbacks = append(p.startCallbacks, callbacks...)
|
||||
}
|
||||
|
||||
func (s *Proxy) startProxy(ctx context.Context) error {
|
||||
func (p *Proxy) startProxy() error {
|
||||
err := p.connectMaster()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.manipulationMsgStream.Start()
|
||||
p.queryMsgStream.Start()
|
||||
p.queryResultMsgStream.Start()
|
||||
p.taskSch.Start()
|
||||
p.idAllocator.Start()
|
||||
p.tsoAllocator.Start()
|
||||
|
||||
// Run callbacks
|
||||
for _, cb := range s.startCallbacks {
|
||||
for _, cb := range p.startCallbacks {
|
||||
cb()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -80,43 +122,6 @@ func (p *Proxy) grpcLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Proxy) pulsarMsgStreamLoop() {
|
||||
defer p.proxyLoopWg.Done()
|
||||
p.manipulationMsgStream = &msgstream.PulsarMsgStream{}
|
||||
p.queryMsgStream = &msgstream.PulsarMsgStream{}
|
||||
// TODO: config, RepackFunc
|
||||
p.manipulationMsgStream.Start()
|
||||
p.queryMsgStream.Start()
|
||||
|
||||
ctx, cancel := context.WithCancel(p.proxyLoopCtx)
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Print("proxy is closed...")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Proxy) scheduleLoop() {
|
||||
defer p.proxyLoopWg.Done()
|
||||
|
||||
p.taskSch = &TaskScheduler{}
|
||||
p.taskSch.Start(p.ctx)
|
||||
defer p.taskSch.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(p.proxyLoopCtx)
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Print("proxy is closed...")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Proxy) connectMaster() error {
|
||||
log.Printf("Connected to master, master_addr=%s", "127.0.0.1:5053")
|
||||
ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
@ -131,7 +136,9 @@ func (p *Proxy) connectMaster() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Proxy) receiveResultLoop() {
|
||||
func (p *Proxy) queryResultLoop() {
|
||||
defer p.proxyLoopWg.Done()
|
||||
|
||||
queryResultBuf := make(map[UniqueID][]*internalpb.SearchResult)
|
||||
|
||||
for {
|
||||
@ -141,7 +148,7 @@ func (p *Proxy) receiveResultLoop() {
|
||||
}
|
||||
tsMsg := msgPack.Msgs[0]
|
||||
searchResultMsg, _ := (*tsMsg).(*msgstream.SearchResultMsg)
|
||||
reqId := UniqueID(searchResultMsg.GetReqId())
|
||||
reqId := searchResultMsg.GetReqId()
|
||||
_, ok := queryResultBuf[reqId]
|
||||
if !ok {
|
||||
queryResultBuf[reqId] = make([]*internalpb.SearchResult, 0)
|
||||
@ -157,58 +164,41 @@ func (p *Proxy) receiveResultLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Proxy) queryResultLoop() {
|
||||
defer p.proxyLoopWg.Done()
|
||||
p.queryResultMsgStream = &msgstream.PulsarMsgStream{}
|
||||
// TODO: config
|
||||
p.queryResultMsgStream.Start()
|
||||
|
||||
go p.receiveResultLoop()
|
||||
|
||||
ctx, cancel := context.WithCancel(p.proxyLoopCtx)
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Print("proxy is closed...")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
func (p *Proxy) startProxyLoop(ctx context.Context) {
|
||||
p.proxyLoopCtx, p.proxyLoopCancel = context.WithCancel(ctx)
|
||||
p.proxyLoopWg.Add(4)
|
||||
|
||||
p.connectMaster()
|
||||
|
||||
func (p *Proxy) startProxyLoop() {
|
||||
p.proxyLoopWg.Add(2)
|
||||
go p.grpcLoop()
|
||||
go p.pulsarMsgStreamLoop()
|
||||
go p.queryResultLoop()
|
||||
go p.scheduleLoop()
|
||||
|
||||
}
|
||||
|
||||
func (p *Proxy) Run() error {
|
||||
if err := p.startProxy(p.ctx); err != nil {
|
||||
if err := p.startProxy(); err != nil {
|
||||
return err
|
||||
}
|
||||
p.startProxyLoop(p.ctx)
|
||||
p.startProxyLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Proxy) stopProxyLoop() {
|
||||
if p.grpcServer != nil{
|
||||
p.proxyLoopCancel()
|
||||
|
||||
if p.grpcServer != nil {
|
||||
p.grpcServer.GracefulStop()
|
||||
}
|
||||
p.proxyLoopCancel()
|
||||
p.tsoAllocator.Close()
|
||||
p.idAllocator.Close()
|
||||
p.taskSch.Close()
|
||||
p.manipulationMsgStream.Close()
|
||||
p.queryMsgStream.Close()
|
||||
p.queryResultMsgStream.Close()
|
||||
|
||||
p.proxyLoopWg.Wait()
|
||||
}
|
||||
|
||||
// Close closes the server.
|
||||
func (p *Proxy) Close() {
|
||||
p.stopProxyLoop()
|
||||
|
||||
for _, cb := range p.closeCallbacks {
|
||||
cb()
|
||||
}
|
||||
|
@ -5,6 +5,8 @@ import (
|
||||
"context"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
)
|
||||
|
||||
type BaseTaskQueue struct {
|
||||
@ -150,12 +152,28 @@ type TaskScheduler struct {
|
||||
DmQueue *dmTaskQueue
|
||||
DqQueue *dqTaskQueue
|
||||
|
||||
// tsAllocator, ReqIdAllocator
|
||||
idAllocator *allocator.IdAllocator
|
||||
tsoAllocator *allocator.TimestampAllocator
|
||||
|
||||
wg sync.WaitGroup
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func NewTaskScheduler(ctx context.Context,
|
||||
idAllocator *allocator.IdAllocator,
|
||||
tsoAllocator *allocator.TimestampAllocator) (*TaskScheduler, error) {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
s := &TaskScheduler{
|
||||
idAllocator: idAllocator,
|
||||
tsoAllocator: tsoAllocator,
|
||||
ctx: ctx1,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (sched *TaskScheduler) scheduleDdTask() *task {
|
||||
return sched.DdQueue.PopUnissuedTask()
|
||||
}
|
||||
@ -204,7 +222,6 @@ func (sched *TaskScheduler) definitionLoop() {
|
||||
(*t).Notify(err)
|
||||
|
||||
sched.DdQueue.AddActiveTask(t)
|
||||
//sched.DdQueue.atLock.Unlock()
|
||||
|
||||
(*t).WaitToFinish()
|
||||
(*t).PostExecute()
|
||||
@ -287,8 +304,7 @@ func (sched *TaskScheduler) queryLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (sched *TaskScheduler) Start(ctx context.Context) error {
|
||||
sched.ctx, sched.cancel = context.WithCancel(ctx)
|
||||
func (sched *TaskScheduler) Start() error {
|
||||
sched.wg.Add(3)
|
||||
|
||||
go sched.definitionLoop()
|
||||
|
@ -3,75 +3,116 @@ package proxy
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/allocator"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
|
||||
"github.com/apache/pulsar-client-go/pulsar"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
)
|
||||
|
||||
type timeTick struct {
|
||||
lastTick Timestamp
|
||||
currentTick Timestamp
|
||||
interval uint64
|
||||
pulsarProducer pulsar.Producer
|
||||
peer_id int64
|
||||
ctx context.Context
|
||||
lastTick Timestamp
|
||||
currentTick Timestamp
|
||||
interval uint64
|
||||
|
||||
pulsarProducer pulsar.Producer
|
||||
|
||||
tsoAllocator *allocator.TimestampAllocator
|
||||
tickMsgStream *msgstream.PulsarMsgStream
|
||||
|
||||
peerID UniqueID
|
||||
wg sync.WaitGroup
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
timer *time.Ticker
|
||||
|
||||
areRequestsDelivered func(ts Timestamp) bool
|
||||
getTimestamp func() (Timestamp, error)
|
||||
}
|
||||
|
||||
func newTimeTick(ctx context.Context, tsoAllocator *allocator.TimestampAllocator) *timeTick {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
t := &timeTick{
|
||||
ctx: ctx1,
|
||||
cancel: cancel,
|
||||
tsoAllocator: tsoAllocator,
|
||||
}
|
||||
|
||||
bufSzie := int64(1000)
|
||||
t.tickMsgStream = msgstream.NewPulsarMsgStream(t.ctx, bufSzie)
|
||||
|
||||
pulsarAddress := "pulsar://localhost:6650"
|
||||
producerChannels := []string{"timeTick"}
|
||||
t.tickMsgStream.SetPulsarCient(pulsarAddress)
|
||||
t.tickMsgStream.CreatePulsarProducers(producerChannels)
|
||||
return t
|
||||
}
|
||||
|
||||
|
||||
func (tt *timeTick) tick() error {
|
||||
|
||||
if tt.lastTick == tt.currentTick {
|
||||
ts, err := tt.getTimestamp()
|
||||
ts, err := tt.tsoAllocator.AllocOne()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tt.currentTick = ts
|
||||
}
|
||||
|
||||
if tt.areRequestsDelivered(tt.currentTick) == false {
|
||||
return errors.New("Failed")
|
||||
return nil
|
||||
}
|
||||
tsm := internalpb.TimeTickMsg{
|
||||
MsgType: internalpb.MsgType_kTimeTick,
|
||||
PeerId: tt.peer_id,
|
||||
Timestamp: uint64(tt.currentTick),
|
||||
}
|
||||
payload, err := proto.Marshal(&tsm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tt.pulsarProducer.Send(tt.ctx, &pulsar.ProducerMessage{Payload: payload}); err != nil {
|
||||
return err
|
||||
msgPack := msgstream.MsgPack{}
|
||||
var timeTickMsg msgstream.TsMsg = &msgstream.TimeTickMsg{
|
||||
TimeTickMsg: internalpb.TimeTickMsg{
|
||||
MsgType: internalpb.MsgType_kTimeTick,
|
||||
PeerId: tt.peerID,
|
||||
Timestamp: tt.currentTick,
|
||||
},
|
||||
}
|
||||
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
|
||||
tt.tickMsgStream.Produce(&msgPack)
|
||||
tt.lastTick = tt.currentTick
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tt *timeTick) Restart() error {
|
||||
func (tt *timeTick) tickLoop() {
|
||||
defer tt.wg.Done()
|
||||
tt.timer = time.NewTicker(time.Millisecond * time.Duration(tt.interval))
|
||||
for {
|
||||
select {
|
||||
case <-tt.timer.C:
|
||||
if err := tt.tick(); err != nil {
|
||||
log.Printf("timeTick error")
|
||||
}
|
||||
case <-tt.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tt *timeTick) Start() error {
|
||||
tt.lastTick = 0
|
||||
ts, err := tt.getTimestamp()
|
||||
ts, err := tt.tsoAllocator.AllocOne()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tt.currentTick = ts
|
||||
tick := time.Tick(time.Millisecond * time.Duration(tt.interval))
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-tick:
|
||||
if err := tt.tick(); err != nil {
|
||||
log.Printf("timeTick error")
|
||||
}
|
||||
case <-tt.ctx.Done():
|
||||
tt.pulsarProducer.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
tt.tickMsgStream.Start()
|
||||
tt.wg.Add(1)
|
||||
go tt.tickLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tt *timeTick) Close() {
|
||||
if tt.timer != nil {
|
||||
tt.timer.Stop()
|
||||
}
|
||||
tt.cancel()
|
||||
tt.tickMsgStream.Close()
|
||||
tt.wg.Wait()
|
||||
}
|
||||
|
@ -28,20 +28,16 @@ func TestTimeTick(t *testing.T) {
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 4*time.Second)
|
||||
|
||||
var curTs Timestamp
|
||||
curTs = 0
|
||||
//var curTs Timestamp
|
||||
//curTs = 0
|
||||
tt := timeTick{
|
||||
interval: 200,
|
||||
pulsarProducer: producer,
|
||||
peer_id: 1,
|
||||
peerID: 1,
|
||||
ctx: ctx,
|
||||
areRequestsDelivered: func(ts Timestamp) bool { return true },
|
||||
getTimestamp: func() (Timestamp, error) {
|
||||
curTs = curTs + 100
|
||||
return curTs, nil
|
||||
},
|
||||
}
|
||||
tt.Restart()
|
||||
tt.Start()
|
||||
|
||||
ctx2, _ := context.WithTimeout(context.Background(), time.Second*2)
|
||||
isbreak := false
|
||||
|
@ -31,7 +31,7 @@ type Timestamp = typeutil.Timestamp
|
||||
|
||||
func TestInsert(t *testing.T) {
|
||||
// TODO: fix test
|
||||
return
|
||||
//return
|
||||
//ctx := context.Background()
|
||||
//var topics []string
|
||||
//topics = append(topics, "test")
|
||||
|
@ -10,7 +10,7 @@ type UniqueID = typeutil.UniqueID
|
||||
|
||||
func TestKey2Seg(t *testing.T) {
|
||||
// TODO: fix test
|
||||
return
|
||||
//return
|
||||
//
|
||||
//lookupUrl := "pulsar://localhost:6650"
|
||||
//client, err := pulsar.NewClient(pulsar.ClientOptions{
|
||||
|
Loading…
Reference in New Issue
Block a user