Bulkinsert read varchar by batch (#26199)

Signed-off-by: yhmo <yihua.mo@zilliz.com>
This commit is contained in:
groot 2023-08-30 14:23:06 +08:00 committed by GitHub
parent 30ca45868d
commit 1b1bafaff1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -561,9 +561,26 @@ func (n *NumpyAdapter) ReadString(count int) ([]string, error) {
return nil, errors.New("numpy reader is nil")
}
// read string one by one is not efficient, here we read strings batch by batch, each bach size is no more than 16MB
batchRead := 1 // rows of each batch, make sure this value is equal or greater than 1
if utf {
batchRead += SingleBlockSize / (utf8.UTFMax * maxLen)
} else {
batchRead += SingleBlockSize / maxLen
}
log.Info("Numpy adapter: prepare to read varchar batch by batch",
zap.Int("readSize", readSize), zap.Int("batchRead", batchRead))
// read data
data := make([]string, 0)
for i := 0; i < readSize; i++ {
for {
// the last batch
readDone := len(data)
if readDone+batchRead > readSize {
batchRead = readSize - readDone
}
if utf {
// in the numpy file with utf32 encoding, the dType could be like "<U2",
// "<" is byteorder(LittleEndian), "U" means it is utf32 encoding, "2" means the max length of strings is 2(characters)
@ -572,36 +589,54 @@ func (n *NumpyAdapter) ReadString(count int) ([]string, error) {
// the character "a" occupys 2*4=8 bytes(0x97,0x00,0x00,0x00,0x00,0x00,0x00,0x00),
// the "bb" occupys 8 bytes(0x97,0x00,0x00,0x00,0x98,0x00,0x00,0x00)
// for non-ascii characters, the unicode could be 1 ~ 4 bytes, each character occupys 4 bytes, too
raw, err := ioutil.ReadAll(io.LimitReader(n.reader, utf8.UTFMax*int64(maxLen)))
raw, err := ioutil.ReadAll(io.LimitReader(n.reader, utf8.UTFMax*int64(maxLen)*int64(batchRead)))
if err != nil {
log.Warn("Numpy adapter: failed to read utf32 bytes from numpy file", zap.Int("i", i), zap.Error(err))
log.Warn("Numpy adapter: failed to read utf32 bytes from numpy file",
zap.Int("readDone", readDone), zap.Error(err))
return nil, fmt.Errorf("failed to read utf32 bytes from numpy file, error: %w", err)
}
str, err := decodeUtf32(raw, n.order)
if err != nil {
log.Warn("Numpy adapter: failed todecode utf32 bytes", zap.Int("i", i), zap.Error(err))
return nil, fmt.Errorf("failed to decode utf32 bytes, error: %w", err)
}
// read string one by one from the buffer
for j := 0; j < batchRead; j++ {
str, err := decodeUtf32(raw[j*utf8.UTFMax*maxLen:(j+1)*utf8.UTFMax*maxLen], n.order)
if err != nil {
log.Warn("Numpy adapter: failed todecode utf32 bytes",
zap.Int("position", readDone+j), zap.Error(err))
return nil, fmt.Errorf("failed to decode utf32 bytes, error: %w", err)
}
data = append(data, str)
data = append(data, str)
}
} else {
// in the numpy file with ansi encoding, the dType could be like "S2", maxLen is 2, each string occupys 2 bytes
// bytes.Index(buf, []byte{0}) tell us which position is the end of the string
buf, err := ioutil.ReadAll(io.LimitReader(n.reader, int64(maxLen)))
buf, err := ioutil.ReadAll(io.LimitReader(n.reader, int64(maxLen)*int64(batchRead)))
if err != nil {
log.Warn("Numpy adapter: failed to read ascii bytes from numpy file", zap.Int("i", i), zap.Error(err))
log.Warn("Numpy adapter: failed to read ascii bytes from numpy file",
zap.Int("readDone", readDone), zap.Error(err))
return nil, fmt.Errorf("failed to read ascii bytes from numpy file, error: %w", err)
}
n := bytes.Index(buf, []byte{0})
if n > 0 {
buf = buf[:n]
}
data = append(data, string(buf))
// read string one by one from the buffer
for j := 0; j < batchRead; j++ {
oneBuf := buf[j*maxLen : (j+1)*maxLen]
n := bytes.Index(oneBuf, []byte{0})
if n > 0 {
oneBuf = oneBuf[:n]
}
data = append(data, string(oneBuf))
}
}
// quit the circle if specified size is read
if len(data) >= readSize {
break
}
}
log.Info("Numpy adapter: a block of varchar has been read", zap.Int("rowCount", len(data)))
// update read position after successfully read
n.readPosition += readSize