mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 20:09:57 +08:00
fix: Fix bug for read data from azure (#30007)
issue: #30005 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
parent
5ee9f734c1
commit
6bfa826320
@ -90,44 +90,55 @@ func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObje
|
|||||||
return &AzureObjectStorage{Client: client}, nil
|
return &AzureObjectStorage{Client: client}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BlobReader is implemented because Azure's stream body does not have ReadAt and Seek interfaces.
|
||||||
|
// BlobReader is not concurrency safe.
|
||||||
type BlobReader struct {
|
type BlobReader struct {
|
||||||
client *blockblob.Client
|
client *blockblob.Client
|
||||||
position int64
|
position int64
|
||||||
|
body io.ReadCloser
|
||||||
|
needResetStream bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBlobReader(client *blockblob.Client, offset int64) (*BlobReader, error) {
|
func NewBlobReader(client *blockblob.Client, offset int64) (*BlobReader, error) {
|
||||||
return &BlobReader{client: client, position: offset}, nil
|
return &BlobReader{client: client, position: offset, needResetStream: true}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BlobReader) Read(p []byte) (n int, err error) {
|
func (b *BlobReader) Read(p []byte) (n int, err error) {
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
|
|
||||||
opts := &azblob.DownloadStreamOptions{}
|
if b.needResetStream {
|
||||||
if b.position > 0 {
|
opts := &azblob.DownloadStreamOptions{
|
||||||
opts.Range = blob.HTTPRange{
|
Range: blob.HTTPRange{
|
||||||
Offset: b.position,
|
Offset: b.position,
|
||||||
}
|
},
|
||||||
}
|
}
|
||||||
object, err := b.client.DownloadStream(ctx, opts)
|
object, err := b.client.DownloadStream(ctx, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
n, err = object.Body.Read(p)
|
b.body = object.Body
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err = b.body.Read(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
b.position += int64(n)
|
b.position += int64(n)
|
||||||
|
b.needResetStream = false
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BlobReader) Close() error {
|
func (b *BlobReader) Close() error {
|
||||||
|
if b.body != nil {
|
||||||
|
return b.body.Close()
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BlobReader) ReadAt(p []byte, off int64) (n int, err error) {
|
func (b *BlobReader) ReadAt(p []byte, off int64) (n int, err error) {
|
||||||
httpRange := blob.HTTPRange{
|
httpRange := blob.HTTPRange{
|
||||||
Offset: off,
|
Offset: off,
|
||||||
|
Count: int64(len(p)),
|
||||||
}
|
}
|
||||||
object, err := b.client.DownloadStream(context.Background(), &blob.DownloadStreamOptions{
|
object, err := b.client.DownloadStream(context.Background(), &blob.DownloadStreamOptions{
|
||||||
Range: httpRange,
|
Range: httpRange,
|
||||||
@ -158,6 +169,7 @@ func (b *BlobReader) Seek(offset int64, whence int) (int64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
b.position = newOffset
|
b.position = newOffset
|
||||||
|
b.needResetStream = true
|
||||||
return newOffset, nil
|
return newOffset, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,10 +181,6 @@ func (AzureObjectStorage *AzureObjectStorage) GetObject(ctx context.Context, buc
|
|||||||
Count: size,
|
Count: size,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_, err := AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName).DownloadStream(ctx, &opts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, checkObjectStorageError(objectName, err)
|
|
||||||
}
|
|
||||||
return NewBlobReader(AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName), offset)
|
return NewBlobReader(AzureObjectStorage.Client.NewContainerClient(bucketName).NewBlockBlobClient(objectName), offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,15 +101,11 @@ func TestAzureObjectStorage(t *testing.T) {
|
|||||||
_, err = testCM.GetObject(ctx, config.bucketName, test.loadKey, 1, 1023)
|
_, err = testCM.GetObject(ctx, config.bucketName, test.loadKey, 1, 1023)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
} else {
|
} else {
|
||||||
if test.loadKey == "/" {
|
|
||||||
got, err := testCM.GetObject(ctx, config.bucketName, test.loadKey, 0, 1024)
|
got, err := testCM.GetObject(ctx, config.bucketName, test.loadKey, 0, 1024)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotEmpty(t, got)
|
||||||
|
_, err = io.ReadAll(got)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
assert.Empty(t, got)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
got, err := testCM.GetObject(ctx, config.bucketName, test.loadKey, 0, 1024)
|
|
||||||
assert.Error(t, err)
|
|
||||||
assert.Empty(t, got)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -963,8 +963,8 @@ func TestAzureChunkManager(t *testing.T) {
|
|||||||
assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound))
|
assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound))
|
||||||
|
|
||||||
_, err = testCM.Reader(ctx, key)
|
_, err = testCM.Reader(ctx, key)
|
||||||
assert.Error(t, err)
|
// lazy error for real read
|
||||||
assert.True(t, errors.Is(err, merr.ErrIoKeyNotFound))
|
assert.NoError(t, err)
|
||||||
|
|
||||||
_, err = testCM.ReadAt(ctx, key, 100, 1)
|
_, err = testCM.ReadAt(ctx, key, 100, 1)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
|
Loading…
Reference in New Issue
Block a user