From 1596abf6e8fff46f3aa8ce16376e6fe1cddbe889 Mon Sep 17 00:00:00 2001 From: Ten Thousand Leaves <69466447+soothing-rain@users.noreply.github.com> Date: Fri, 31 Mar 2023 20:16:24 +0800 Subject: [PATCH] Add lock for segment manager during bulk insert (#23111) /kind bug issue: #23106 Signed-off-by: Yuchen Gao --- internal/datacoord/segment_manager.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 5db3792588..ee0a3fbc42 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -292,11 +292,15 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID // allocSegmentForImport allocates one segment allocation for bulk insert. func (s *SegmentManager) allocSegmentForImport(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64, importTaskID int64) (*Allocation, error) { - // init allocation - allocation := getAllocation(requestRows) + _, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "Alloc-ImportSegment") + defer sp.End() + s.mu.Lock() + defer s.mu.Unlock() - // create new segments and add allocations to meta - // to avoid mixing up with growing segments, the segment state is "Importing" + // Init allocation. + allocation := getAllocation(requestRows) + // Create new segments and add allocations to meta. + // To avoid mixing up with growing segments, the segment state is "Importing" expireTs, err := s.genExpireTs(ctx, true) if err != nil { return nil, err