2024-05-10 18:01:30 +08:00
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"encoding/json"
"fmt"
"strings"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/client/v2/column"
"github.com/milvus-io/milvus/client/v2/entity"
2024-05-22 19:15:40 +08:00
"github.com/milvus-io/milvus/client/v2/row"
2024-05-10 18:01:30 +08:00
)
type InsertOption interface {
InsertRequest ( coll * entity . Collection ) ( * milvuspb . InsertRequest , error )
CollectionName ( ) string
}
type UpsertOption interface {
UpsertRequest ( coll * entity . Collection ) ( * milvuspb . UpsertRequest , error )
CollectionName ( ) string
}
var (
_ UpsertOption = ( * columnBasedDataOption ) ( nil )
_ InsertOption = ( * columnBasedDataOption ) ( nil )
)
type columnBasedDataOption struct {
collName string
partitionName string
columns [ ] column . Column
}
func ( opt * columnBasedDataOption ) processInsertColumns ( colSchema * entity . Schema , columns ... column . Column ) ( [ ] * schemapb . FieldData , int , error ) {
// setup dynamic related var
isDynamic := colSchema . EnableDynamicField
// check columns and field matches
var rowSize int
mNameField := make ( map [ string ] * entity . Field )
for _ , field := range colSchema . Fields {
mNameField [ field . Name ] = field
}
mNameColumn := make ( map [ string ] column . Column )
var dynamicColumns [ ] column . Column
for _ , col := range columns {
_ , dup := mNameColumn [ col . Name ( ) ]
if dup {
return nil , 0 , fmt . Errorf ( "duplicated column %s found" , col . Name ( ) )
}
l := col . Len ( )
if rowSize == 0 {
rowSize = l
2024-05-22 19:15:40 +08:00
} else if rowSize != l {
return nil , 0 , errors . New ( "column size not match" )
2024-05-10 18:01:30 +08:00
}
field , has := mNameField [ col . Name ( ) ]
if ! has {
if ! isDynamic {
return nil , 0 , fmt . Errorf ( "field %s does not exist in collection %s" , col . Name ( ) , colSchema . CollectionName )
}
// add to dynamic column list for further processing
dynamicColumns = append ( dynamicColumns , col )
continue
}
mNameColumn [ col . Name ( ) ] = col
if col . Type ( ) != field . DataType {
2024-06-07 14:47:52 +08:00
return nil , 0 , fmt . Errorf ( "param column %s has type %v but collection field definition is %v" , col . Name ( ) , col . Type ( ) , field . DataType )
2024-05-10 18:01:30 +08:00
}
if field . DataType == entity . FieldTypeFloatVector || field . DataType == entity . FieldTypeBinaryVector {
dim := 0
switch column := col . ( type ) {
case * column . ColumnFloatVector :
dim = column . Dim ( )
case * column . ColumnBinaryVector :
dim = column . Dim ( )
}
if fmt . Sprintf ( "%d" , dim ) != field . TypeParams [ entity . TypeParamDim ] {
return nil , 0 , fmt . Errorf ( "params column %s vector dim %d not match collection definition, which has dim of %s" , field . Name , dim , field . TypeParams [ entity . TypeParamDim ] )
}
}
}
// check all fixed field pass value
for _ , field := range colSchema . Fields {
_ , has := mNameColumn [ field . Name ]
if ! has &&
! field . AutoID && ! field . IsDynamic {
return nil , 0 , fmt . Errorf ( "field %s not passed" , field . Name )
}
}
fieldsData := make ( [ ] * schemapb . FieldData , 0 , len ( mNameColumn ) + 1 )
for _ , fixedColumn := range mNameColumn {
fieldsData = append ( fieldsData , fixedColumn . FieldData ( ) )
}
if len ( dynamicColumns ) > 0 {
// use empty column name here
col , err := opt . mergeDynamicColumns ( "" , rowSize , dynamicColumns )
if err != nil {
return nil , 0 , err
}
fieldsData = append ( fieldsData , col )
}
return fieldsData , rowSize , nil
}
func ( opt * columnBasedDataOption ) mergeDynamicColumns ( dynamicName string , rowSize int , columns [ ] column . Column ) ( * schemapb . FieldData , error ) {
values := make ( [ ] [ ] byte , 0 , rowSize )
for i := 0 ; i < rowSize ; i ++ {
m := make ( map [ string ] interface { } )
for _ , column := range columns {
// range guaranteed
m [ column . Name ( ) ] , _ = column . Get ( i )
}
bs , err := json . Marshal ( m )
if err != nil {
return nil , err
}
values = append ( values , bs )
}
return & schemapb . FieldData {
Type : schemapb . DataType_JSON ,
FieldName : dynamicName ,
Field : & schemapb . FieldData_Scalars {
Scalars : & schemapb . ScalarField {
Data : & schemapb . ScalarField_JsonData {
JsonData : & schemapb . JSONArray {
Data : values ,
} ,
} ,
} ,
} ,
IsDynamic : true ,
} , nil
}
func ( opt * columnBasedDataOption ) WithColumns ( columns ... column . Column ) * columnBasedDataOption {
opt . columns = append ( opt . columns , columns ... )
return opt
}
func ( opt * columnBasedDataOption ) WithBoolColumn ( colName string , data [ ] bool ) * columnBasedDataOption {
column := column . NewColumnBool ( colName , data )
return opt . WithColumns ( column )
}
func ( opt * columnBasedDataOption ) WithInt8Column ( colName string , data [ ] int8 ) * columnBasedDataOption {
column := column . NewColumnInt8 ( colName , data )
return opt . WithColumns ( column )
}
func ( opt * columnBasedDataOption ) WithInt16Column ( colName string , data [ ] int16 ) * columnBasedDataOption {
column := column . NewColumnInt16 ( colName , data )
return opt . WithColumns ( column )
}
func ( opt * columnBasedDataOption ) WithInt32Column ( colName string , data [ ] int32 ) * columnBasedDataOption {
column := column . NewColumnInt32 ( colName , data )
return opt . WithColumns ( column )
}
func ( opt * columnBasedDataOption ) WithInt64Column ( colName string , data [ ] int64 ) * columnBasedDataOption {
column := column . NewColumnInt64 ( colName , data )
return opt . WithColumns ( column )
}
func ( opt * columnBasedDataOption ) WithVarcharColumn ( colName string , data [ ] string ) * columnBasedDataOption {
column := column . NewColumnVarChar ( colName , data )
return opt . WithColumns ( column )
}
func ( opt * columnBasedDataOption ) WithFloatVectorColumn ( colName string , dim int , data [ ] [ ] float32 ) * columnBasedDataOption {
column := column . NewColumnFloatVector ( colName , dim , data )
return opt . WithColumns ( column )
}
func ( opt * columnBasedDataOption ) WithBinaryVectorColumn ( colName string , dim int , data [ ] [ ] byte ) * columnBasedDataOption {
column := column . NewColumnBinaryVector ( colName , dim , data )
return opt . WithColumns ( column )
}
func ( opt * columnBasedDataOption ) WithPartition ( partitionName string ) * columnBasedDataOption {
opt . partitionName = partitionName
return opt
}
func ( opt * columnBasedDataOption ) CollectionName ( ) string {
return opt . collName
}
func ( opt * columnBasedDataOption ) InsertRequest ( coll * entity . Collection ) ( * milvuspb . InsertRequest , error ) {
fieldsData , rowNum , err := opt . processInsertColumns ( coll . Schema , opt . columns ... )
if err != nil {
return nil , err
}
return & milvuspb . InsertRequest {
CollectionName : opt . collName ,
PartitionName : opt . partitionName ,
FieldsData : fieldsData ,
NumRows : uint32 ( rowNum ) ,
} , nil
}
func ( opt * columnBasedDataOption ) UpsertRequest ( coll * entity . Collection ) ( * milvuspb . UpsertRequest , error ) {
fieldsData , rowNum , err := opt . processInsertColumns ( coll . Schema , opt . columns ... )
if err != nil {
return nil , err
}
return & milvuspb . UpsertRequest {
CollectionName : opt . collName ,
PartitionName : opt . partitionName ,
FieldsData : fieldsData ,
NumRows : uint32 ( rowNum ) ,
} , nil
}
func NewColumnBasedInsertOption ( collName string , columns ... column . Column ) * columnBasedDataOption {
return & columnBasedDataOption {
columns : columns ,
collName : collName ,
// leave partition name empty, using default partition
}
}
2024-05-22 19:15:40 +08:00
type rowBasedDataOption struct {
* columnBasedDataOption
rows [ ] any
}
func NewRowBasedInsertOption ( collName string , rows ... any ) * rowBasedDataOption {
return & rowBasedDataOption {
columnBasedDataOption : & columnBasedDataOption {
collName : collName ,
} ,
rows : rows ,
}
}
func ( opt * rowBasedDataOption ) InsertRequest ( coll * entity . Collection ) ( * milvuspb . InsertRequest , error ) {
columns , err := row . AnyToColumns ( opt . rows , coll . Schema )
if err != nil {
return nil , err
}
opt . columnBasedDataOption . columns = columns
fieldsData , rowNum , err := opt . processInsertColumns ( coll . Schema , opt . columns ... )
if err != nil {
return nil , err
}
return & milvuspb . InsertRequest {
CollectionName : opt . collName ,
PartitionName : opt . partitionName ,
FieldsData : fieldsData ,
NumRows : uint32 ( rowNum ) ,
} , nil
}
func ( opt * rowBasedDataOption ) UpsertRequest ( coll * entity . Collection ) ( * milvuspb . UpsertRequest , error ) {
columns , err := row . AnyToColumns ( opt . rows , coll . Schema )
if err != nil {
return nil , err
}
opt . columnBasedDataOption . columns = columns
fieldsData , rowNum , err := opt . processInsertColumns ( coll . Schema , opt . columns ... )
if err != nil {
return nil , err
}
return & milvuspb . UpsertRequest {
CollectionName : opt . collName ,
PartitionName : opt . partitionName ,
FieldsData : fieldsData ,
NumRows : uint32 ( rowNum ) ,
} , nil
}
2024-05-10 18:01:30 +08:00
type DeleteOption interface {
Request ( ) * milvuspb . DeleteRequest
}
type deleteOption struct {
collectionName string
partitionName string
expr string
}
func ( opt * deleteOption ) Request ( ) * milvuspb . DeleteRequest {
return & milvuspb . DeleteRequest {
CollectionName : opt . collectionName ,
PartitionName : opt . partitionName ,
Expr : opt . expr ,
}
}
func ( opt * deleteOption ) WithExpr ( expr string ) * deleteOption {
opt . expr = expr
return opt
}
func ( opt * deleteOption ) WithInt64IDs ( fieldName string , ids [ ] int64 ) * deleteOption {
opt . expr = fmt . Sprintf ( "%s in %s" , fieldName , strings . Join ( strings . Fields ( fmt . Sprint ( ids ) ) , "," ) )
return opt
}
func ( opt * deleteOption ) WithStringIDs ( fieldName string , ids [ ] string ) * deleteOption {
opt . expr = fmt . Sprintf ( "%s in [%s]" , fieldName , strings . Join ( lo . Map ( ids , func ( id string , _ int ) string { return fmt . Sprintf ( "\"%s\"" , id ) } ) , "," ) )
return opt
}
func ( opt * deleteOption ) WithPartition ( partitionName string ) * deleteOption {
opt . partitionName = partitionName
return opt
}
func NewDeleteOption ( collectionName string ) * deleteOption {
return & deleteOption { collectionName : collectionName }
}