mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Identify service providers based on addresses (#27907)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
This commit is contained in:
parent
8b5b33fce0
commit
6c3f29d003
@ -9,7 +9,10 @@ import (
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
|
||||
const OSSDefaultAddress = "oss.aliyuncs.com"
|
||||
const (
|
||||
OSSAddressFeatureString = "aliyuncs.com"
|
||||
OSSDefaultAddress = "oss.aliyuncs.com"
|
||||
)
|
||||
|
||||
// NewMinioClient returns a minio.Client which is compatible for aliyun OSS
|
||||
func NewMinioClient(address string, opts *minio.Options) (*minio.Client, error) {
|
||||
|
@ -27,17 +27,13 @@ import (
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
minio "github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/mmap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/storage/aliyun"
|
||||
"github.com/milvus-io/milvus/internal/storage/gcp"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/retry"
|
||||
"github.com/milvus-io/milvus/pkg/util/timerecord"
|
||||
)
|
||||
|
||||
@ -78,73 +74,10 @@ func NewMinioChunkManager(ctx context.Context, opts ...Option) (*MinioChunkManag
|
||||
}
|
||||
|
||||
func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunkManager, error) {
|
||||
var creds *credentials.Credentials
|
||||
newMinioFn := minio.New
|
||||
bucketLookupType := minio.BucketLookupAuto
|
||||
|
||||
if c.useVirtualHost {
|
||||
bucketLookupType = minio.BucketLookupDNS
|
||||
}
|
||||
|
||||
switch c.cloudProvider {
|
||||
case CloudProviderAliyun:
|
||||
// auto doesn't work for aliyun, so we set to dns deliberately
|
||||
bucketLookupType = minio.BucketLookupDNS
|
||||
if c.useIAM {
|
||||
newMinioFn = aliyun.NewMinioClient
|
||||
} else {
|
||||
creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "")
|
||||
}
|
||||
case CloudProviderGCP:
|
||||
newMinioFn = gcp.NewMinioClient
|
||||
if !c.useIAM {
|
||||
creds = credentials.NewStaticV2(c.accessKeyID, c.secretAccessKeyID, "")
|
||||
}
|
||||
default: // aws, minio
|
||||
if c.useIAM {
|
||||
creds = credentials.NewIAM("")
|
||||
} else {
|
||||
creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "")
|
||||
}
|
||||
}
|
||||
minioOpts := &minio.Options{
|
||||
BucketLookup: bucketLookupType,
|
||||
Creds: creds,
|
||||
Secure: c.useSSL,
|
||||
Region: c.region,
|
||||
}
|
||||
minIOClient, err := newMinioFn(c.address, minioOpts)
|
||||
// options nil or invalid formatted endpoint, don't need to retry
|
||||
minIOClient, err := newMinioClient(ctx, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var bucketExists bool
|
||||
// check valid in first query
|
||||
checkBucketFn := func() error {
|
||||
bucketExists, err = minIOClient.BucketExists(ctx, c.bucketName)
|
||||
if err != nil {
|
||||
log.Warn("failed to check blob bucket exist", zap.String("bucket", c.bucketName), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if !bucketExists {
|
||||
if c.createBucket {
|
||||
log.Info("blob bucket not exist, create bucket.", zap.Any("bucket name", c.bucketName))
|
||||
err := minIOClient.MakeBucket(ctx, c.bucketName, minio.MakeBucketOptions{})
|
||||
if err != nil {
|
||||
log.Warn("failed to create blob bucket", zap.String("bucket", c.bucketName), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("bucket %s not Existed", c.bucketName)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
err = retry.Do(ctx, checkBucketFn, retry.Attempts(CheckBucketRetryAttempts))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mcm := &MinioChunkManager{
|
||||
Client: minIOClient,
|
||||
bucketName: c.bucketName,
|
||||
|
@ -20,9 +20,10 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
minio "github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -36,11 +37,16 @@ type MinioObjectStorage struct {
|
||||
*minio.Client
|
||||
}
|
||||
|
||||
func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObjectStorage, error) {
|
||||
func newMinioClient(ctx context.Context, c *config) (*minio.Client, error) {
|
||||
var creds *credentials.Credentials
|
||||
newMinioFn := minio.New
|
||||
bucketLookupType := minio.BucketLookupAuto
|
||||
|
||||
if c.useVirtualHost {
|
||||
bucketLookupType = minio.BucketLookupDNS
|
||||
}
|
||||
|
||||
matchedDefault := false
|
||||
switch c.cloudProvider {
|
||||
case CloudProviderAliyun:
|
||||
// auto doesn't work for aliyun, so we set to dns deliberately
|
||||
@ -56,6 +62,34 @@ func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObje
|
||||
creds = credentials.NewStaticV2(c.accessKeyID, c.secretAccessKeyID, "")
|
||||
}
|
||||
default: // aws, minio
|
||||
matchedDefault = true
|
||||
}
|
||||
|
||||
// Compatibility logic. If the cloud provider is not specified in the request,
|
||||
// it shall be inferred based on the service address.
|
||||
if matchedDefault {
|
||||
matchedDefault = false
|
||||
switch {
|
||||
case strings.Contains(c.address, gcp.GcsDefaultAddress):
|
||||
newMinioFn = gcp.NewMinioClient
|
||||
if !c.useIAM {
|
||||
creds = credentials.NewStaticV2(c.accessKeyID, c.secretAccessKeyID, "")
|
||||
}
|
||||
case strings.Contains(c.address, aliyun.OSSAddressFeatureString):
|
||||
// auto doesn't work for aliyun, so we set to dns deliberately
|
||||
bucketLookupType = minio.BucketLookupDNS
|
||||
if c.useIAM {
|
||||
newMinioFn = aliyun.NewMinioClient
|
||||
} else {
|
||||
creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "")
|
||||
}
|
||||
default:
|
||||
matchedDefault = true
|
||||
}
|
||||
}
|
||||
|
||||
if matchedDefault {
|
||||
// aws, minio
|
||||
if c.useIAM {
|
||||
creds = credentials.NewIAM("")
|
||||
} else {
|
||||
@ -66,6 +100,7 @@ func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObje
|
||||
BucketLookup: bucketLookupType,
|
||||
Creds: creds,
|
||||
Secure: c.useSSL,
|
||||
Region: c.region,
|
||||
}
|
||||
minIOClient, err := newMinioFn(c.address, minioOpts)
|
||||
// options nil or invalid formatted endpoint, don't need to retry
|
||||
@ -98,7 +133,14 @@ func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObje
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return minIOClient, nil
|
||||
}
|
||||
|
||||
func newMinioObjectStorageWithConfig(ctx context.Context, c *config) (*MinioObjectStorage, error) {
|
||||
minIOClient, err := newMinioClient(ctx, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &MinioObjectStorage{minIOClient}, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user