mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Fix bug: data race in MsgFactory (#14839)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
parent
92b8eddb5c
commit
a5f8bdeb42
@ -220,9 +220,19 @@ func (node *DataNode) Init() error {
|
||||
}
|
||||
Params.DataNodeCfg.Refresh()
|
||||
|
||||
m := map[string]interface{}{
|
||||
"PulsarAddress": Params.DataNodeCfg.PulsarAddress,
|
||||
"ReceiveBufSize": 1024,
|
||||
"PulsarBufSize": 1024,
|
||||
}
|
||||
|
||||
if err := node.msFactory.SetParams(m); err != nil {
|
||||
log.Warn("DataNode Init msFactory SetParams failed, use default",
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("DataNode Init",
|
||||
zap.String("MsgChannelSubName", Params.DataNodeCfg.MsgChannelSubName),
|
||||
)
|
||||
zap.String("MsgChannelSubName", Params.DataNodeCfg.MsgChannelSubName))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -137,18 +137,6 @@ func (dsService *dataSyncService) close() {
|
||||
// initNodes inits a TimetickedFlowGraph
|
||||
func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) error {
|
||||
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
|
||||
|
||||
m := map[string]interface{}{
|
||||
"PulsarAddress": Params.DataNodeCfg.PulsarAddress,
|
||||
"ReceiveBufSize": 1024,
|
||||
"PulsarBufSize": 1024,
|
||||
}
|
||||
|
||||
err := dsService.msFactory.SetParams(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// initialize flush manager for DataSync Service
|
||||
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.blobKV, dsService.replica,
|
||||
flushNotifyFunc(dsService), dropVirtualChannelFunc(dsService))
|
||||
@ -216,6 +204,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
|
||||
parallelConfig: newParallelConfig(),
|
||||
}
|
||||
|
||||
var err error
|
||||
var dmStreamNode Node
|
||||
dmStreamNode, err = newDmInputNode(dsService.ctx, vchanInfo.GetSeekPosition(), c)
|
||||
if err != nil {
|
||||
|
@ -96,7 +96,7 @@ func TestDataSyncService_newDataSyncService(te *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
tests := []*testInfo{
|
||||
{false, false, &mockMsgStreamFactory{false, true},
|
||||
{true, false, &mockMsgStreamFactory{false, true},
|
||||
0, "by-dev-rootcoord-dml-test_v0",
|
||||
0, 0, "", 0,
|
||||
0, 0, "", 0,
|
||||
|
Loading…
Reference in New Issue
Block a user