[skip ci]Add chunk manager comment (#7833)

Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
godchen 2021-09-17 14:49:50 +08:00 committed by GitHub
parent f44b5ee1d0
commit 31df0faaf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 0 deletions

View File

@ -22,16 +22,19 @@ import (
"github.com/milvus-io/milvus/internal/log"
)
// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {
localPath string
}
// NewLocalChunkManager create a new local manager object.
func NewLocalChunkManager(localPath string) *LocalChunkManager {
return &LocalChunkManager{
localPath: localPath,
}
}
// GetPath returns the path of local data if exist.
func (lcm *LocalChunkManager) GetPath(key string) (string, error) {
if !lcm.Exist(key) {
return "", errors.New("local file cannot be found with key:" + key)
@ -40,6 +43,7 @@ func (lcm *LocalChunkManager) GetPath(key string) (string, error) {
return path, nil
}
// Write writes the data to local storage.
func (lcm *LocalChunkManager) Write(key string, content []byte) error {
filePath := path.Join(lcm.localPath, key)
dir := path.Dir(filePath)
@ -56,6 +60,7 @@ func (lcm *LocalChunkManager) Write(key string, content []byte) error {
return nil
}
// Exist checks whether chunk is saved to local storage.
func (lcm *LocalChunkManager) Exist(key string) bool {
path := path.Join(lcm.localPath, key)
_, err := os.Stat(path)
@ -65,6 +70,7 @@ func (lcm *LocalChunkManager) Exist(key string) bool {
return true
}
// Read reads the local storage data if exist.
func (lcm *LocalChunkManager) Read(key string) ([]byte, error) {
path := path.Join(lcm.localPath, key)
file, err := os.Open(path)
@ -79,6 +85,7 @@ func (lcm *LocalChunkManager) Read(key string) ([]byte, error) {
return content, nil
}
// ReadAt reads specific position data of local storage if exist.
func (lcm *LocalChunkManager) ReadAt(key string, p []byte, off int64) (n int, err error) {
path := path.Join(lcm.localPath, key)
at, err := mmap.Open(path)

View File

@ -18,16 +18,19 @@ import (
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
)
// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {
minio *miniokv.MinIOKV
}
// NewMinioChunkManager create a new local manager object.
func NewMinioChunkManager(minio *miniokv.MinIOKV) *MinioChunkManager {
return &MinioChunkManager{
minio: minio,
}
}
// GetPath returns the path of minio data if exist.
func (mcm *MinioChunkManager) GetPath(key string) (string, error) {
if !mcm.Exist(key) {
return "", errors.New("minio file manage cannot be found with key:" + key)
@ -35,19 +38,23 @@ func (mcm *MinioChunkManager) GetPath(key string) (string, error) {
return key, nil
}
// Write writes the data to minio storage.
func (mcm *MinioChunkManager) Write(key string, content []byte) error {
return mcm.minio.Save(key, string(content))
}
// Exist checks whether chunk is saved to minio storage.
func (mcm *MinioChunkManager) Exist(key string) bool {
return mcm.minio.Exist(key)
}
// Read reads the minio storage data if exist.
func (mcm *MinioChunkManager) Read(key string) ([]byte, error) {
results, err := mcm.minio.Load(key)
return []byte(results), err
}
// ReadAt reads specific position data of minio storage if exist.
func (mcm *MinioChunkManager) ReadAt(key string, p []byte, off int64) (int, error) {
results, err := mcm.minio.Load(key)
if err != nil {

View File

@ -20,6 +20,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/etcdpb"
)
// VectorChunkManager is responsible for read and write vector data.
type VectorChunkManager struct {
localChunkManager ChunkManager
remoteChunkManager ChunkManager
@ -29,6 +30,7 @@ type VectorChunkManager struct {
localCacheEnable bool
}
// NewVectorChunkManager create a new vector manager object.
func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager ChunkManager, schema *etcdpb.CollectionMeta, localCacheEnable bool) *VectorChunkManager {
return &VectorChunkManager{
localChunkManager: localChunkManager,
@ -39,6 +41,9 @@ func NewVectorChunkManager(localChunkManager ChunkManager, remoteChunkManager Ch
}
}
// For vector data, we will download vector file from storage. And we will
// deserialize the file for it has binlog style. At last we store pure vector
// data to local storage as cache.
func (vcm *VectorChunkManager) downloadVectorFile(key string) ([]byte, error) {
if vcm.localChunkManager.Exist(key) {
return vcm.localChunkManager.Read(key)
@ -78,6 +83,8 @@ func (vcm *VectorChunkManager) downloadVectorFile(key string) ([]byte, error) {
return results, nil
}
// GetPath returns the path of vector data. If cached, return local path.
// If not cached return remote path.
func (vcm *VectorChunkManager) GetPath(key string) (string, error) {
if vcm.localChunkManager.Exist(key) && vcm.localCacheEnable {
return vcm.localChunkManager.GetPath(key)
@ -85,6 +92,7 @@ func (vcm *VectorChunkManager) GetPath(key string) (string, error) {
return vcm.remoteChunkManager.GetPath(key)
}
// Write writes the vector data to local cache if cache enabled.
func (vcm *VectorChunkManager) Write(key string, content []byte) error {
if !vcm.localCacheEnable {
return errors.New("Cannot write local file for local cache is not allowed")
@ -92,10 +100,12 @@ func (vcm *VectorChunkManager) Write(key string, content []byte) error {
return vcm.localChunkManager.Write(key, content)
}
// Exist checks whether vector data is saved to local cache.
func (vcm *VectorChunkManager) Exist(key string) bool {
return vcm.localChunkManager.Exist(key)
}
// Read reads the pure vector data. If cached, it reads from local.
func (vcm *VectorChunkManager) Read(key string) ([]byte, error) {
if vcm.localCacheEnable {
if vcm.localChunkManager.Exist(key) {
@ -114,6 +124,7 @@ func (vcm *VectorChunkManager) Read(key string) ([]byte, error) {
return vcm.downloadVectorFile(key)
}
// ReadAt reads specific position data of vector. If cached, it reads from local.
func (vcm *VectorChunkManager) ReadAt(key string, p []byte, off int64) (int, error) {
if vcm.localCacheEnable {
if vcm.localChunkManager.Exist(key) {