Fill the WriteHandoff field in the upgrade script (#20139)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2022-10-27 20:25:35 +08:00 committed by GitHub
parent 4848edda8d
commit 1be4d1e267
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 26 additions and 7 deletions

View File

@ -230,6 +230,7 @@ func combineToSegmentIndexesMeta220(segmentIndexes SegmentIndexesMeta210, indexB
CreateTime: record.GetCreateTime(),
IndexFileKeys: fileKeys,
IndexSize: buildMeta.GetSerializeSize(),
WriteHandoff: buildMeta.GetState() == commonpb.IndexState_Finished,
}
segmentIndexModels.AddRecord(segID, indexID, segmentIndexModel)
}

View File

@ -254,14 +254,17 @@ func (hd *handoff) process(segID UniqueID, front bool) {
//IndexSize: 0,
})
}
if err := hd.writeHandoffSegment(handoffTask); err != nil {
log.Ctx(hd.ctx).Warn("write handoff task fail, need to retry", zap.Int64("segID", segID), zap.Error(err))
return
}
log.Ctx(hd.ctx).Info("write handoff task success", zap.Int64("segID", segID))
if err := hd.meta.MarkSegmentWriteHandoff(segID); err != nil {
log.Ctx(hd.ctx).Warn("mark segment as write handoff fail, need to retry", zap.Int64("segID", segID), zap.Error(err))
return
if !hd.meta.AlreadyWrittenHandoff(segID) {
if err := hd.writeHandoffSegment(handoffTask); err != nil {
log.Ctx(hd.ctx).Warn("write handoff task fail, need to retry", zap.Int64("segID", segID), zap.Error(err))
return
}
if err := hd.meta.MarkSegmentWriteHandoff(segID); err != nil {
log.Ctx(hd.ctx).Warn("mark segment as write handoff fail, need to retry", zap.Int64("segID", segID), zap.Error(err))
return
}
}
log.Ctx(hd.ctx).Info("mark segment as write handoff success, remove task", zap.Int64("segID", segID))

View File

@ -1084,3 +1084,17 @@ func (mt *metaTable) MarkSegmentWriteHandoff(segID UniqueID) error {
}
return mt.alterSegmentIndexes(segIdxes)
}
func (mt *metaTable) AlreadyWrittenHandoff(segID UniqueID) bool {
mt.segmentIndexLock.RLock()
defer mt.segmentIndexLock.RUnlock()
if segIndexes, ok := mt.segmentIndexes[segID]; ok {
for _, segIdx := range segIndexes {
if !segIdx.WriteHandoff {
return false
}
}
}
return true
}

View File

@ -222,6 +222,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
for _, buildID := range buildIDs {
cit.indexCoordClient.indexBuilder.enqueue(buildID)
}
// If the handoff is not notified here, the segment that has been loaded will not be able to replace the index
for _, segment := range segments {
cit.indexCoordClient.handoff.enqueue(segment)
}