mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 11:29:48 +08:00
Refine insert when send message (#27491)
Signed-off-by: Cai Zhang <cai.zhang@zilliz.com> Co-authored-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
ac2d1bb5c2
commit
2b13078b14
@ -56,7 +56,7 @@ type mqMsgStream struct {
|
||||
closeRWMutex *sync.RWMutex
|
||||
streamCancel func()
|
||||
bufSize int64
|
||||
producerLock *sync.Mutex
|
||||
producerLock *sync.RWMutex
|
||||
consumerLock *sync.Mutex
|
||||
closed int32
|
||||
onceChan sync.Once
|
||||
@ -88,7 +88,7 @@ func NewMqMsgStream(ctx context.Context,
|
||||
bufSize: bufSize,
|
||||
receiveBuf: receiveBuf,
|
||||
streamCancel: streamCancel,
|
||||
producerLock: &sync.Mutex{},
|
||||
producerLock: &sync.RWMutex{},
|
||||
consumerLock: &sync.Mutex{},
|
||||
closeRWMutex: &sync.RWMutex{},
|
||||
closed: 0,
|
||||
@ -288,13 +288,13 @@ func (ms *mqMsgStream) Produce(msgPack *MsgPack) error {
|
||||
msg := &mqwrapper.ProducerMessage{Payload: m, Properties: map[string]string{}}
|
||||
InjectCtx(spanCtx, msg.Properties)
|
||||
|
||||
ms.producerLock.Lock()
|
||||
ms.producerLock.RLock()
|
||||
if _, err := ms.producers[channel].Send(spanCtx, msg); err != nil {
|
||||
ms.producerLock.Unlock()
|
||||
ms.producerLock.RUnlock()
|
||||
sp.RecordError(err)
|
||||
return err
|
||||
}
|
||||
ms.producerLock.Unlock()
|
||||
ms.producerLock.RUnlock()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
Loading…
Reference in New Issue
Block a user