milvus/internal/msgstream/global_mmq.go
xige-16 51b2b45e03 Add copyright for msgStream
Signed-off-by: xige-16 <xi.ge@zilliz.com>
2021-04-19 11:30:19 +08:00

183 lines
3.9 KiB
Go

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package msgstream
import (
"errors"
"sync"
)
var Mmq *MemMQ
var once sync.Once
type MemConsumer struct {
GroupName string
ChannelName string
MsgChan chan *MsgPack
}
type MemMQ struct {
consumers map[string][]*MemConsumer
consumerMu sync.Mutex
}
func (mmq *MemMQ) CreateChannel(channelName string) error {
mmq.consumerMu.Lock()
defer mmq.consumerMu.Unlock()
if _, ok := mmq.consumers[channelName]; !ok {
consumers := make([]*MemConsumer, 0)
mmq.consumers[channelName] = consumers
}
return nil
}
func (mmq *MemMQ) DestroyChannel(channelName string) error {
mmq.consumerMu.Lock()
defer mmq.consumerMu.Unlock()
consumers, ok := mmq.consumers[channelName]
if ok {
// send nil to consumer so that client can close it self
for _, consumer := range consumers {
consumer.MsgChan <- nil
}
}
delete(mmq.consumers, channelName)
return nil
}
func (mmq *MemMQ) CreateConsumerGroup(groupName string, channelName string) (*MemConsumer, error) {
mmq.consumerMu.Lock()
defer mmq.consumerMu.Unlock()
consumers, ok := mmq.consumers[channelName]
if !ok {
consumers = make([]*MemConsumer, 0)
mmq.consumers[channelName] = consumers
}
// exist?
for _, consumer := range consumers {
if consumer.GroupName == groupName {
return consumer, nil
}
}
// append new
consumer := MemConsumer{
GroupName: groupName,
ChannelName: channelName,
MsgChan: make(chan *MsgPack, 1024),
}
mmq.consumers[channelName] = append(mmq.consumers[channelName], &consumer)
return &consumer, nil
}
func (mmq *MemMQ) DestroyConsumerGroup(groupName string, channelName string) error {
mmq.consumerMu.Lock()
defer mmq.consumerMu.Unlock()
consumers, ok := mmq.consumers[channelName]
if !ok {
return nil
}
tempConsumers := make([]*MemConsumer, 0)
for _, consumer := range consumers {
if consumer.GroupName == groupName {
// send nil to consumer so that client can close it self
consumer.MsgChan <- nil
} else {
tempConsumers = append(tempConsumers, consumer)
}
}
mmq.consumers[channelName] = tempConsumers
return nil
}
func (mmq *MemMQ) Produce(channelName string, msgPack *MsgPack) error {
if msgPack == nil {
return nil
}
mmq.consumerMu.Lock()
defer mmq.consumerMu.Unlock()
consumers, ok := mmq.consumers[channelName]
if !ok {
return errors.New("Channel " + channelName + " doesn't exist")
}
for _, consumer := range consumers {
consumer.MsgChan <- msgPack
}
return nil
}
func (mmq *MemMQ) Broadcast(msgPack *MsgPack) error {
if msgPack == nil {
return nil
}
mmq.consumerMu.Lock()
defer mmq.consumerMu.Unlock()
for _, consumers := range mmq.consumers {
for _, consumer := range consumers {
consumer.MsgChan <- msgPack
}
}
return nil
}
func (mmq *MemMQ) Consume(groupName string, channelName string) (*MsgPack, error) {
var consumer *MemConsumer = nil
mmq.consumerMu.Lock()
consumers, ok := mmq.consumers[channelName]
if !ok {
return nil, errors.New("Channel " + channelName + " doesn't exist")
}
for _, c := range consumers {
if c.GroupName == groupName {
consumer = c
break
}
}
mmq.consumerMu.Unlock()
msg, ok := <-consumer.MsgChan
if !ok {
return nil, nil
}
return msg, nil
}
func InitMmq() error {
var err error
once.Do(func() {
Mmq = &MemMQ{
consumerMu: sync.Mutex{},
}
Mmq.consumers = make(map[string][]*MemConsumer)
})
return err
}