package msgstream import ( "context" "errors" "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" ) type MsgType = commonpb.MsgType type MarshalType = interface{} type TsMsg interface { TraceCtx() context.Context SetTraceCtx(ctx context.Context) ID() UniqueID BeginTs() Timestamp EndTs() Timestamp Type() MsgType HashKeys() []uint32 Marshal(TsMsg) (MarshalType, error) Unmarshal(MarshalType) (TsMsg, error) Position() *MsgPosition SetPosition(*MsgPosition) } type BaseMsg struct { Ctx context.Context BeginTimestamp Timestamp EndTimestamp Timestamp HashValues []uint32 MsgPosition *MsgPosition } func (bm *BaseMsg) BeginTs() Timestamp { return bm.BeginTimestamp } func (bm *BaseMsg) EndTs() Timestamp { return bm.EndTimestamp } func (bm *BaseMsg) HashKeys() []uint32 { return bm.HashValues } func (bm *BaseMsg) Position() *MsgPosition { return bm.MsgPosition } func (bm *BaseMsg) SetPosition(position *MsgPosition) { bm.MsgPosition = position } func ConvertToByteArray(input interface{}) ([]byte, error) { switch output := input.(type) { case []byte: return output, nil default: return nil, errors.New("cannot convert interface{} to []byte") } } /////////////////////////////////////////Insert////////////////////////////////////////// type InsertMsg struct { BaseMsg internalpb.InsertRequest } func (it *InsertMsg) TraceCtx() context.Context { return it.BaseMsg.Ctx } func (it *InsertMsg) SetTraceCtx(ctx context.Context) { it.BaseMsg.Ctx = ctx } func (it *InsertMsg) ID() UniqueID { return it.Base.MsgID } func (it *InsertMsg) Type() MsgType { return it.Base.MsgType } func (it *InsertMsg) Marshal(input TsMsg) (MarshalType, error) { insertMsg := input.(*InsertMsg) insertRequest := &insertMsg.InsertRequest mb, err := proto.Marshal(insertRequest) if err != nil { return nil, err } return mb, nil } func (it *InsertMsg) Unmarshal(input MarshalType) (TsMsg, error) { insertRequest := internalpb.InsertRequest{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &insertRequest) if err != nil { return nil, err } insertMsg := &InsertMsg{InsertRequest: insertRequest} for _, timestamp := range insertMsg.Timestamps { insertMsg.BeginTimestamp = timestamp insertMsg.EndTimestamp = timestamp break } for _, timestamp := range insertMsg.Timestamps { if timestamp > insertMsg.EndTimestamp { insertMsg.EndTimestamp = timestamp } if timestamp < insertMsg.BeginTimestamp { insertMsg.BeginTimestamp = timestamp } } return insertMsg, nil } /////////////////////////////////////////FlushCompletedMsg////////////////////////////////////////// type FlushCompletedMsg struct { BaseMsg internalpb.SegmentFlushCompletedMsg } func (fl *FlushCompletedMsg) TraceCtx() context.Context { return fl.BaseMsg.Ctx } func (fl *FlushCompletedMsg) SetTraceCtx(ctx context.Context) { fl.BaseMsg.Ctx = ctx } func (fl *FlushCompletedMsg) ID() UniqueID { return fl.Base.MsgID } func (fl *FlushCompletedMsg) Type() MsgType { return fl.Base.MsgType } func (fl *FlushCompletedMsg) Marshal(input TsMsg) (MarshalType, error) { flushCompletedMsgTask := input.(*FlushCompletedMsg) flushCompletedMsg := &flushCompletedMsgTask.SegmentFlushCompletedMsg mb, err := proto.Marshal(flushCompletedMsg) if err != nil { return nil, err } return mb, nil } func (fl *FlushCompletedMsg) Unmarshal(input MarshalType) (TsMsg, error) { flushCompletedMsg := internalpb.SegmentFlushCompletedMsg{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &flushCompletedMsg) if err != nil { return nil, err } flushCompletedMsgTask := &FlushCompletedMsg{SegmentFlushCompletedMsg: flushCompletedMsg} flushCompletedMsgTask.BeginTimestamp = flushCompletedMsgTask.Base.Timestamp flushCompletedMsgTask.EndTimestamp = flushCompletedMsgTask.Base.Timestamp return flushCompletedMsgTask, nil } /////////////////////////////////////////Flush////////////////////////////////////////// // GOOSE TODO remove this type FlushMsg struct { BaseMsg internalpb.FlushMsg } func (fl *FlushMsg) TraceCtx() context.Context { return fl.BaseMsg.Ctx } func (fl *FlushMsg) SetTraceCtx(ctx context.Context) { fl.BaseMsg.Ctx = ctx } func (fl *FlushMsg) ID() UniqueID { return fl.Base.MsgID } func (fl *FlushMsg) Type() MsgType { return fl.Base.MsgType } func (fl *FlushMsg) Marshal(input TsMsg) (MarshalType, error) { flushMsgTask := input.(*FlushMsg) flushMsg := &flushMsgTask.FlushMsg mb, err := proto.Marshal(flushMsg) if err != nil { return nil, err } return mb, nil } func (fl *FlushMsg) Unmarshal(input MarshalType) (TsMsg, error) { flushMsg := internalpb.FlushMsg{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &flushMsg) if err != nil { return nil, err } flushMsgTask := &FlushMsg{FlushMsg: flushMsg} flushMsgTask.BeginTimestamp = flushMsgTask.Base.Timestamp flushMsgTask.EndTimestamp = flushMsgTask.Base.Timestamp return flushMsgTask, nil } /////////////////////////////////////////Delete////////////////////////////////////////// type DeleteMsg struct { BaseMsg internalpb.DeleteRequest } func (dt *DeleteMsg) TraceCtx() context.Context { return dt.BaseMsg.Ctx } func (dt *DeleteMsg) SetTraceCtx(ctx context.Context) { dt.BaseMsg.Ctx = ctx } func (dt *DeleteMsg) ID() UniqueID { return dt.Base.MsgID } func (dt *DeleteMsg) Type() MsgType { return dt.Base.MsgType } func (dt *DeleteMsg) Marshal(input TsMsg) (MarshalType, error) { deleteMsg := input.(*DeleteMsg) deleteRequest := &deleteMsg.DeleteRequest mb, err := proto.Marshal(deleteRequest) if err != nil { return nil, err } return mb, nil } func (dt *DeleteMsg) Unmarshal(input MarshalType) (TsMsg, error) { deleteRequest := internalpb.DeleteRequest{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &deleteRequest) if err != nil { return nil, err } deleteMsg := &DeleteMsg{DeleteRequest: deleteRequest} for _, timestamp := range deleteMsg.Timestamps { deleteMsg.BeginTimestamp = timestamp deleteMsg.EndTimestamp = timestamp break } for _, timestamp := range deleteMsg.Timestamps { if timestamp > deleteMsg.EndTimestamp { deleteMsg.EndTimestamp = timestamp } if timestamp < deleteMsg.BeginTimestamp { deleteMsg.BeginTimestamp = timestamp } } return deleteMsg, nil } /////////////////////////////////////////Search////////////////////////////////////////// type SearchMsg struct { BaseMsg internalpb.SearchRequest } func (st *SearchMsg) TraceCtx() context.Context { return st.BaseMsg.Ctx } func (st *SearchMsg) SetTraceCtx(ctx context.Context) { st.BaseMsg.Ctx = ctx } func (st *SearchMsg) ID() UniqueID { return st.Base.MsgID } func (st *SearchMsg) Type() MsgType { return st.Base.MsgType } func (st *SearchMsg) Marshal(input TsMsg) (MarshalType, error) { searchTask := input.(*SearchMsg) searchRequest := &searchTask.SearchRequest mb, err := proto.Marshal(searchRequest) if err != nil { return nil, err } return mb, nil } func (st *SearchMsg) Unmarshal(input MarshalType) (TsMsg, error) { searchRequest := internalpb.SearchRequest{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &searchRequest) if err != nil { return nil, err } searchMsg := &SearchMsg{SearchRequest: searchRequest} searchMsg.BeginTimestamp = searchMsg.Base.Timestamp searchMsg.EndTimestamp = searchMsg.Base.Timestamp return searchMsg, nil } /////////////////////////////////////////SearchResult////////////////////////////////////////// type SearchResultMsg struct { BaseMsg internalpb.SearchResults } func (srt *SearchResultMsg) TraceCtx() context.Context { return srt.BaseMsg.Ctx } func (srt *SearchResultMsg) SetTraceCtx(ctx context.Context) { srt.BaseMsg.Ctx = ctx } func (srt *SearchResultMsg) ID() UniqueID { return srt.Base.MsgID } func (srt *SearchResultMsg) Type() MsgType { return srt.Base.MsgType } func (srt *SearchResultMsg) Marshal(input TsMsg) (MarshalType, error) { searchResultTask := input.(*SearchResultMsg) searchResultRequest := &searchResultTask.SearchResults mb, err := proto.Marshal(searchResultRequest) if err != nil { return nil, err } return mb, nil } func (srt *SearchResultMsg) Unmarshal(input MarshalType) (TsMsg, error) { searchResultRequest := internalpb.SearchResults{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &searchResultRequest) if err != nil { return nil, err } searchResultMsg := &SearchResultMsg{SearchResults: searchResultRequest} searchResultMsg.BeginTimestamp = searchResultMsg.Base.Timestamp searchResultMsg.EndTimestamp = searchResultMsg.Base.Timestamp return searchResultMsg, nil } /////////////////////////////////////////TimeTick////////////////////////////////////////// type TimeTickMsg struct { BaseMsg internalpb.TimeTickMsg } func (tst *TimeTickMsg) TraceCtx() context.Context { return tst.BaseMsg.Ctx } func (tst *TimeTickMsg) SetTraceCtx(ctx context.Context) { tst.BaseMsg.Ctx = ctx } func (tst *TimeTickMsg) ID() UniqueID { return tst.Base.MsgID } func (tst *TimeTickMsg) Type() MsgType { return tst.Base.MsgType } func (tst *TimeTickMsg) Marshal(input TsMsg) (MarshalType, error) { timeTickTask := input.(*TimeTickMsg) timeTick := &timeTickTask.TimeTickMsg mb, err := proto.Marshal(timeTick) if err != nil { return nil, err } return mb, nil } func (tst *TimeTickMsg) Unmarshal(input MarshalType) (TsMsg, error) { timeTickMsg := internalpb.TimeTickMsg{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &timeTickMsg) if err != nil { return nil, err } timeTick := &TimeTickMsg{TimeTickMsg: timeTickMsg} timeTick.BeginTimestamp = timeTick.Base.Timestamp timeTick.EndTimestamp = timeTick.Base.Timestamp return timeTick, nil } /////////////////////////////////////////QueryNodeStats////////////////////////////////////////// // GOOSE TODO: remove QueryNodeStats type QueryNodeStatsMsg struct { BaseMsg internalpb.QueryNodeStats } func (qs *QueryNodeStatsMsg) TraceCtx() context.Context { return qs.BaseMsg.Ctx } func (qs *QueryNodeStatsMsg) SetTraceCtx(ctx context.Context) { qs.BaseMsg.Ctx = ctx } func (qs *QueryNodeStatsMsg) ID() UniqueID { return qs.Base.MsgID } func (qs *QueryNodeStatsMsg) Type() MsgType { return qs.Base.MsgType } func (qs *QueryNodeStatsMsg) Marshal(input TsMsg) (MarshalType, error) { queryNodeSegStatsTask := input.(*QueryNodeStatsMsg) queryNodeSegStats := &queryNodeSegStatsTask.QueryNodeStats mb, err := proto.Marshal(queryNodeSegStats) if err != nil { return nil, err } return mb, nil } func (qs *QueryNodeStatsMsg) Unmarshal(input MarshalType) (TsMsg, error) { queryNodeSegStats := internalpb.QueryNodeStats{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &queryNodeSegStats) if err != nil { return nil, err } queryNodeSegStatsMsg := &QueryNodeStatsMsg{QueryNodeStats: queryNodeSegStats} return queryNodeSegStatsMsg, nil } /////////////////////////////////////////SegmentStatisticsMsg////////////////////////////////////////// type SegmentStatisticsMsg struct { BaseMsg internalpb.SegmentStatistics } func (ss *SegmentStatisticsMsg) TraceCtx() context.Context { return ss.BaseMsg.Ctx } func (ss *SegmentStatisticsMsg) SetTraceCtx(ctx context.Context) { ss.BaseMsg.Ctx = ctx } func (ss *SegmentStatisticsMsg) ID() UniqueID { return ss.Base.MsgID } func (ss *SegmentStatisticsMsg) Type() MsgType { return ss.Base.MsgType } func (ss *SegmentStatisticsMsg) Marshal(input TsMsg) (MarshalType, error) { segStatsTask := input.(*SegmentStatisticsMsg) segStats := &segStatsTask.SegmentStatistics mb, err := proto.Marshal(segStats) if err != nil { return nil, err } return mb, nil } func (ss *SegmentStatisticsMsg) Unmarshal(input MarshalType) (TsMsg, error) { segStats := internalpb.SegmentStatistics{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &segStats) if err != nil { return nil, err } segStatsMsg := &SegmentStatisticsMsg{SegmentStatistics: segStats} return segStatsMsg, nil } ///////////////////////////////////////////Key2Seg////////////////////////////////////////// //type Key2SegMsg struct { // BaseMsg // internalpb.Key2SegMsg //} // //func (k2st *Key2SegMsg) Type() MsgType { // return //} /////////////////////////////////////////CreateCollection////////////////////////////////////////// type CreateCollectionMsg struct { BaseMsg internalpb.CreateCollectionRequest } func (cc *CreateCollectionMsg) TraceCtx() context.Context { return cc.BaseMsg.Ctx } func (cc *CreateCollectionMsg) SetTraceCtx(ctx context.Context) { cc.BaseMsg.Ctx = ctx } func (cc *CreateCollectionMsg) ID() UniqueID { return cc.Base.MsgID } func (cc *CreateCollectionMsg) Type() MsgType { return cc.Base.MsgType } func (cc *CreateCollectionMsg) Marshal(input TsMsg) (MarshalType, error) { createCollectionMsg := input.(*CreateCollectionMsg) createCollectionRequest := &createCollectionMsg.CreateCollectionRequest mb, err := proto.Marshal(createCollectionRequest) if err != nil { return nil, err } return mb, nil } func (cc *CreateCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) { createCollectionRequest := internalpb.CreateCollectionRequest{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &createCollectionRequest) if err != nil { return nil, err } createCollectionMsg := &CreateCollectionMsg{CreateCollectionRequest: createCollectionRequest} createCollectionMsg.BeginTimestamp = createCollectionMsg.Base.Timestamp createCollectionMsg.EndTimestamp = createCollectionMsg.Base.Timestamp return createCollectionMsg, nil } /////////////////////////////////////////DropCollection////////////////////////////////////////// type DropCollectionMsg struct { BaseMsg internalpb.DropCollectionRequest } func (dc *DropCollectionMsg) TraceCtx() context.Context { return dc.BaseMsg.Ctx } func (dc *DropCollectionMsg) SetTraceCtx(ctx context.Context) { dc.BaseMsg.Ctx = ctx } func (dc *DropCollectionMsg) ID() UniqueID { return dc.Base.MsgID } func (dc *DropCollectionMsg) Type() MsgType { return dc.Base.MsgType } func (dc *DropCollectionMsg) Marshal(input TsMsg) (MarshalType, error) { dropCollectionMsg := input.(*DropCollectionMsg) dropCollectionRequest := &dropCollectionMsg.DropCollectionRequest mb, err := proto.Marshal(dropCollectionRequest) if err != nil { return nil, err } return mb, nil } func (dc *DropCollectionMsg) Unmarshal(input MarshalType) (TsMsg, error) { dropCollectionRequest := internalpb.DropCollectionRequest{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &dropCollectionRequest) if err != nil { return nil, err } dropCollectionMsg := &DropCollectionMsg{DropCollectionRequest: dropCollectionRequest} dropCollectionMsg.BeginTimestamp = dropCollectionMsg.Base.Timestamp dropCollectionMsg.EndTimestamp = dropCollectionMsg.Base.Timestamp return dropCollectionMsg, nil } /////////////////////////////////////////CreatePartition////////////////////////////////////////// type CreatePartitionMsg struct { BaseMsg internalpb.CreatePartitionRequest } func (cp *CreatePartitionMsg) TraceCtx() context.Context { return cp.BaseMsg.Ctx } func (cp *CreatePartitionMsg) SetTraceCtx(ctx context.Context) { cp.BaseMsg.Ctx = ctx } func (cp *CreatePartitionMsg) ID() UniqueID { return cp.Base.MsgID } func (cp *CreatePartitionMsg) Type() MsgType { return cp.Base.MsgType } func (cp *CreatePartitionMsg) Marshal(input TsMsg) (MarshalType, error) { createPartitionMsg := input.(*CreatePartitionMsg) createPartitionRequest := &createPartitionMsg.CreatePartitionRequest mb, err := proto.Marshal(createPartitionRequest) if err != nil { return nil, err } return mb, nil } func (cp *CreatePartitionMsg) Unmarshal(input MarshalType) (TsMsg, error) { createPartitionRequest := internalpb.CreatePartitionRequest{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &createPartitionRequest) if err != nil { return nil, err } createPartitionMsg := &CreatePartitionMsg{CreatePartitionRequest: createPartitionRequest} createPartitionMsg.BeginTimestamp = createPartitionMsg.Base.Timestamp createPartitionMsg.EndTimestamp = createPartitionMsg.Base.Timestamp return createPartitionMsg, nil } /////////////////////////////////////////DropPartition////////////////////////////////////////// type DropPartitionMsg struct { BaseMsg internalpb.DropPartitionRequest } func (dp *DropPartitionMsg) TraceCtx() context.Context { return dp.BaseMsg.Ctx } func (dp *DropPartitionMsg) SetTraceCtx(ctx context.Context) { dp.BaseMsg.Ctx = ctx } func (dp *DropPartitionMsg) ID() UniqueID { return dp.Base.MsgID } func (dp *DropPartitionMsg) Type() MsgType { return dp.Base.MsgType } func (dp *DropPartitionMsg) Marshal(input TsMsg) (MarshalType, error) { dropPartitionMsg := input.(*DropPartitionMsg) dropPartitionRequest := &dropPartitionMsg.DropPartitionRequest mb, err := proto.Marshal(dropPartitionRequest) if err != nil { return nil, err } return mb, nil } func (dp *DropPartitionMsg) Unmarshal(input MarshalType) (TsMsg, error) { dropPartitionRequest := internalpb.DropPartitionRequest{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &dropPartitionRequest) if err != nil { return nil, err } dropPartitionMsg := &DropPartitionMsg{DropPartitionRequest: dropPartitionRequest} dropPartitionMsg.BeginTimestamp = dropPartitionMsg.Base.Timestamp dropPartitionMsg.EndTimestamp = dropPartitionMsg.Base.Timestamp return dropPartitionMsg, nil } /////////////////////////////////////////LoadIndex////////////////////////////////////////// type LoadIndexMsg struct { BaseMsg internalpb.LoadIndex } func (lim *LoadIndexMsg) TraceCtx() context.Context { return lim.BaseMsg.Ctx } func (lim *LoadIndexMsg) SetTraceCtx(ctx context.Context) { lim.BaseMsg.Ctx = ctx } func (lim *LoadIndexMsg) ID() UniqueID { return lim.Base.MsgID } func (lim *LoadIndexMsg) Type() MsgType { return lim.Base.MsgType } func (lim *LoadIndexMsg) Marshal(input TsMsg) (MarshalType, error) { loadIndexMsg := input.(*LoadIndexMsg) loadIndexRequest := &loadIndexMsg.LoadIndex mb, err := proto.Marshal(loadIndexRequest) if err != nil { return nil, err } return mb, nil } func (lim *LoadIndexMsg) Unmarshal(input MarshalType) (TsMsg, error) { loadIndexRequest := internalpb.LoadIndex{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &loadIndexRequest) if err != nil { return nil, err } loadIndexMsg := &LoadIndexMsg{LoadIndex: loadIndexRequest} return loadIndexMsg, nil } /////////////////////////////////////////SegmentInfoMsg////////////////////////////////////////// type SegmentInfoMsg struct { BaseMsg datapb.SegmentMsg } func (sim *SegmentInfoMsg) TraceCtx() context.Context { return sim.BaseMsg.Ctx } func (sim *SegmentInfoMsg) SetTraceCtx(ctx context.Context) { sim.BaseMsg.Ctx = ctx } func (sim *SegmentInfoMsg) ID() UniqueID { return sim.Base.MsgID } func (sim *SegmentInfoMsg) Type() MsgType { return sim.Base.MsgType } func (sim *SegmentInfoMsg) Marshal(input TsMsg) (MarshalType, error) { segInfoMsg := input.(*SegmentInfoMsg) mb, err := proto.Marshal(&segInfoMsg.SegmentMsg) if err != nil { return nil, err } return mb, nil } func (sim *SegmentInfoMsg) Unmarshal(input MarshalType) (TsMsg, error) { segMsg := datapb.SegmentMsg{} in, err := ConvertToByteArray(input) if err != nil { return nil, err } err = proto.Unmarshal(in, &segMsg) if err != nil { return nil, err } return &SegmentInfoMsg{ SegmentMsg: segMsg, }, nil }