From 2952dc03f5427bdaa26fb41a861d6a0ca052fe47 Mon Sep 17 00:00:00 2001 From: Calvin <179209347@qq.com> Date: Sun, 13 Feb 2022 19:08:17 +0800 Subject: [PATCH] upgrade milvus to v2.0 --- 6_biomedicine/dna_sequence_search/README.md | 79 +-- .../dna-search-ui/package.json | 2 +- .../dna_sequence_search.iml | 319 ---------- .../dna_sequence_search/pom.xml | 2 +- .../me/aias/common/milvus/ConnectionPool.java | 235 ++++++++ .../aias/common/milvus/MilvusConnector.java | 37 -- .../me/aias/config/MilvusConfiguration.java | 26 - .../me/aias/config/ModelConfiguration.java | 2 +- .../me/aias/controller/DNASeqController.java | 41 +- .../me/aias/controller/SearchController.java | 51 +- .../java/me/aias/service/SearchService.java | 103 ++-- .../aias/service/impl/SearchServiceImpl.java | 560 +++++++++++------- .../main/java/me/aias/tools/MilvusInit.java | 107 +++- .../src/main/resources/application.yml | 28 +- .../molecular-search/molecular-search.iml | 207 ------- 15 files changed, 773 insertions(+), 1026 deletions(-) delete mode 100644 6_biomedicine/dna_sequence_search/dna_sequence_search/dna_sequence_search.iml create mode 100644 6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/common/milvus/ConnectionPool.java delete mode 100644 6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/common/milvus/MilvusConnector.java delete mode 100644 6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/config/MilvusConfiguration.java delete mode 100644 6_biomedicine/molecular_search/molecular-search/molecular-search.iml diff --git a/6_biomedicine/dna_sequence_search/README.md b/6_biomedicine/dna_sequence_search/README.md index 8485e625..55ec1f34 100644 --- a/6_biomedicine/dna_sequence_search/README.md +++ b/6_biomedicine/dna_sequence_search/README.md @@ -5,7 +5,7 @@ http://aias.top/ 本例子提供了DNA序列搜索,支持上传文件文件,使用spark mlib计算模型提取特征,并基于milvus向量引擎进行后续检索。
- +
#### 引擎特性 @@ -31,7 +31,7 @@ DNA序列测定方法有光学测序和芯片测序两种。 超参数minDF则指定词汇表中的词语至少要在多少个不同文档中出现。 模型训练,推理使用了spark mlib:
- +
- [算法详细介绍](http://spark.apache.org/docs/latest/ml-features.html#countvectorizer) @@ -39,7 +39,7 @@ DNA序列测定方法有光学测序和芯片测序两种。 #### 向量引擎索引策略
- +
@@ -100,7 +100,7 @@ file: linux: path: /home/aias/file/ windows: - path: D:/aias/file/ + path: file:/D:/aias/file/ # 文件大小 /M maxSize: 3000 ... @@ -114,51 +114,30 @@ java -jar dna-sequence-search-0.1.0.jar ``` -## 3. 后端向量引擎部署(docker) +### 3. 后端向量引擎部署(Milvus 2.0) #### 3.1 环境要求: - 需要安装docker运行环境,Mac环境可以使用Docker Desktop #### 3.2 拉取Milvus向量引擎镜像(用于计算特征值向量相似度) -[安装文档](https://github.com/milvus-io/docs/blob/master/v0.10.0/site/zh-CN/quick_start/install_milvus/cpu_milvus_docker.md) -##### 最新版本请参考官网 -- Milvus向量引擎参考链接 -[Milvus向量引擎官网](https://milvus.io/cn/docs/overview.md) -[Milvus向量引擎Github](https://github.com/milvus-io) - +下载 milvus-standalone-docker-compose.yml 配置文件并保存为 docker-compose.yml +[单机版安装文档](https://milvus.io/docs/v2.0.0/install_standalone-docker.md) ```bash -sudo docker pull milvusdb/milvus:0.10.0-cpu-d061620-5f3c00 +wget https://github.com/milvus-io/milvus/releases/download/v2.0.0/milvus-standalone-docker-compose.yml -O docker-compose.yml ``` -#### 3.3 下载配置文件 -[vector_engine.zip](https://aias-home.oss-cn-beijing.aliyuncs.com/AIAS/image_search/vector_engine.zip) - -#### 3.4 启动 Docker 容器 -/Users/calvin/vector_engine为主机路径,根据需要修改。conf下为引擎所需的配置文件。 +#### 3.3 启动 Docker 容器 ```bash -docker run -d --name milvus_cpu_0.10.0 \ --p 19530:19530 \ --p 19121:19121 \ --p 9091:9091 \ --v /Users/calvin/vector_engine/db:/var/lib/milvus/db \ --v /Users/calvin/vector_engine/conf:/var/lib/milvus/conf \ --v /Users/calvin/vector_engine/logs:/var/lib/milvus/logs \ --v /Users/calvin/vector_engine/wal:/var/lib/milvus/wal \ -milvusdb/milvus:0.10.0-cpu-d061620-5f3c00 +sudo docker-compose up -d ``` #### 3.5 编辑向量引擎连接配置信息 - application.yml - 根据需要编辑向量引擎连接ip地址127.0.0.1为容器所在的主机ip ```bash -##################### 向量引擎 ############################### +################## 向量引擎 ################ search: host: 127.0.0.1 port: 19530 - indexFileSize: 1024 # maximum size (in MB) of each index file - nprobe: 16 - nlist: 16384 - collectionName: dna #collection name - ``` #### 4. 打开浏览器 @@ -166,11 +145,11 @@ search: - 上传数据文件 1). 点击上传按钮上传文件. -[测试数据](https://aias-home.oss-cn-beijing.aliyuncs.com/AIAS/6_biomedicine_sdks/dna_sequence_search/human_data.txt) +[测试数据](https://aias-home.oss-cn-beijing.aliyuncs.com/AIAS/6_biomedicine/dna_sequence_search/human_data.txt) 2). 点击特征提取按钮. 等待文件解析,模型训练,特征提取,特征存入向量引擎。通过console可以看到进度信息。
- +
- DNA序列搜索 @@ -179,42 +158,26 @@ search: ATGCCCCAACTAAATACTACCGTATGGCCCACCATAATTACCCCCATACTCCTTACACTATTCCTCATCACCCAACTAAAAATATTAAACACAAACTACCACCTACCTCCCTCACCAAAGCCCATAAAAATAAAAAATTATAACAAACCCTGAGAACCAAAATGAACGAAAATCTGTTCGCTTCATTCATTGCCCCCACAATCCTAG ```
- +
## 5. 帮助信息 - swagger接口文档: http://localhost:8089/swagger-ui.html
- +
- 初始化向量引擎(清空数据): -me.aias.tools.MilvusInit.java + ```bash - String host = "127.0.0.1"; - int port = 19530; - final String collectionName = "dna"; // collection name - - MilvusClient client = new MilvusGrpcClient(); - // Connect to Milvus server - ConnectParam connectParam = new ConnectParam.Builder().withHost(host).withPort(port).build(); - try { - Response connectResponse = client.connect(connectParam); - } catch (ConnectFailedException e) { - e.printStackTrace(); - } - - // 检查 collection 是否存在 - HasCollectionResponse hasCollection = hasCollection(client, collectionName); - if (hasCollection.hasCollection()) { - dropIndex(client, collectionName); - dropCollection(client, collectionName); - } - ... - +me.aias.tools.MilvusInit.java ``` +- Milvus向量引擎参考链接 +[Milvus向量引擎官网](https://milvus.io/cn/docs/overview.md) +[Milvus向量引擎Github](https://github.com/milvus-io) + ### 官网: [官网链接](http://www.aias.top/) diff --git a/6_biomedicine/dna_sequence_search/dna-search-ui/package.json b/6_biomedicine/dna_sequence_search/dna-search-ui/package.json index a2a7c68c..d458537d 100644 --- a/6_biomedicine/dna_sequence_search/dna-search-ui/package.json +++ b/6_biomedicine/dna_sequence_search/dna-search-ui/package.json @@ -15,7 +15,7 @@ "dependencies": { "crypto-js": "^3.1.9-1", "axios": "0.18.1", - "core-js": "3.6.5", + "core-js": "3.21.0", "easy-circular-progress": "1.0.4", "echarts": "^4.2.1", "element-ui": "2.13.2", diff --git a/6_biomedicine/dna_sequence_search/dna_sequence_search/dna_sequence_search.iml b/6_biomedicine/dna_sequence_search/dna_sequence_search/dna_sequence_search.iml deleted file mode 100644 index ebe77dd2..00000000 --- a/6_biomedicine/dna_sequence_search/dna_sequence_search/dna_sequence_search.iml +++ /dev/null @@ -1,319 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/6_biomedicine/dna_sequence_search/dna_sequence_search/pom.xml b/6_biomedicine/dna_sequence_search/dna_sequence_search/pom.xml index 5d129450..3e807e53 100644 --- a/6_biomedicine/dna_sequence_search/dna_sequence_search/pom.xml +++ b/6_biomedicine/dna_sequence_search/dna_sequence_search/pom.xml @@ -65,7 +65,7 @@ io.milvus milvus-sdk-java - 0.8.2 + 2.0.0 diff --git a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/common/milvus/ConnectionPool.java b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/common/milvus/ConnectionPool.java new file mode 100644 index 00000000..b1567395 --- /dev/null +++ b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/common/milvus/ConnectionPool.java @@ -0,0 +1,235 @@ +package me.aias.common.milvus; + +import io.milvus.client.MilvusClient; +import io.milvus.client.MilvusServiceClient; +import io.milvus.param.ConnectParam; + +import java.util.Enumeration; +import java.util.Vector; + +public class ConnectionPool { + private String host = ""; // Milvus 主机 + private int port; // Milvus 端口号 + private static volatile ConnectionPool uniqueInstance; + private int initialConnections = 10; // 连接池的初始大小 + private int incrementalConnections = 5; // 连接池自动增加的大小 + private int maxConnections = 50; // 连接池最大的大小 + private Vector connections = null; // 存放连接池中连接的向量, 存放的对象为 PooledConnection 型 + + private ConnectionPool(String host, int port) { + this.host = host; + this.port = port; + } + + public static ConnectionPool getInstance(String host, String port, boolean refresh) { + if (uniqueInstance == null || refresh) { + synchronized (ConnectionPool.class) { + if (uniqueInstance == null || refresh) { + uniqueInstance = new ConnectionPool(host, Integer.parseInt(port)); + try { + uniqueInstance.createPool(); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + } + } + return uniqueInstance; + } + + private void createPool() { // synchronized + if (connections != null) { + return; // 假如己经创建,则返回 + } + // 创建保存连接的向量 , 初始时有 0 个元素 + connections = new Vector(); + // 根据 initialConnections 中设置的值,创建连接。 + + createConnections(this.initialConnections); + + System.out.println(" Milvus连接池创建成功!"); + } + + private void createConnections(int numConnections) { + // 循环创建指定数目的数据库连接 + for (int x = 0; x < numConnections; x++) { + // 是否连接池中的Milvus连接数量己经达到最大?最大值由类成员 maxConnections + if (this.maxConnections > 0 && this.connections.size() >= this.maxConnections) { + break; + } + // 增加一个连接到连接池中(Vector connections) + connections.addElement(new PooledConnection(newConnection())); + System.out.println(" Milvus连接己创建 ......"); + } + } + + private MilvusClient newConnection() { + // 创建一个 Milvus 客户端 + ConnectParam connectParam = ConnectParam.newBuilder() + .withHost(host) + .withPort(port) + .build(); + + MilvusServiceClient milvusClient = new MilvusServiceClient(connectParam); + // 返回创建的新的Milvus连接 + return milvusClient; + } + + public synchronized MilvusClient getConnection() { + // 确保连接池己被创建 + if (connections == null) { + return null; // 连接池还没创建,则返回 null + } + MilvusClient client = getFreeConnection(); // 获得一个可用的数据库连接 + // 假如目前没有可以使用的连接,即所有的连接都在使用中 + while (client == null) { + // 等一会再试 250 ms + wait(250); + client = getFreeConnection(); // 重新再试,直到获得可用的连接,假如 + // getFreeConnection() 返回的为 null + // 则表明创建一批连接后也不可获得可用连接 + } + return client; // 返回获得的可用的连接 + } + + private MilvusClient getFreeConnection() { + // 从连接池中获得一个可用的Milvus连接 + MilvusClient client = findFreeConnection(); + if (client == null) { + // 假如目前连接池中没有可用的连接 + // 创建一些连接 + createConnections(incrementalConnections); + // 重新从池中查找是否有可用连接 + client = findFreeConnection(); + if (client == null) { + // 假如创建连接后仍获得不到可用的连接,则返回 null + return null; + } + } + return client; + } + + private MilvusClient findFreeConnection() { + MilvusClient client = null; + PooledConnection pConn = null; + // 获得连接池中所有的对象 + Enumeration enumerate = connections.elements(); + // 遍历所有的对象,看是否有可用的连接 + while (enumerate.hasMoreElements()) { + pConn = (PooledConnection) enumerate.nextElement(); + if (!pConn.isBusy()) { + // 假如此对象不忙,则获得它的数据库连接并把它设为忙 + client = pConn.getConnection(); + pConn.setBusy(true); + break; // 己经找到一个可用的连接,退出 + } + } + return client; // 返回找到到的可用连接 + } + + public void returnConnection(MilvusClient client) { + // 确保连接池存在,假如连接没有创建(不存在),直接返回 + if (connections == null) { + System.out.println(" 连接池不存在,无法返回此连接到连接池中 !"); + return; + } + PooledConnection pConn = null; + Enumeration enumerate = connections.elements(); + // 遍历连接池中的所有连接,找到这个要返回的连接对象 + while (enumerate.hasMoreElements()) { + pConn = (PooledConnection) enumerate.nextElement(); + // 先找到连接池中的要返回的连接对象 + if (client == pConn.getConnection()) { + // 找到了 , 设置此连接为空闲状态 + pConn.setBusy(false); + break; + } + } + } + + public synchronized void refreshConnections() { + // 确保连接池己创新存在 + if (connections == null) { + System.out.println(" 连接池不存在,无法刷新 !"); + return; + } + PooledConnection pConn = null; + Enumeration enumerate = connections.elements(); + while (enumerate.hasMoreElements()) { + // 获得一个连接对象 + pConn = (PooledConnection) enumerate.nextElement(); + // 假如对象忙则等 5 秒 ,5 秒后直接刷新 + if (pConn.isBusy()) { + wait(5000); // 等 5 秒 + } + // 关闭此连接,用一个新的连接代替它。 + closeConnection(pConn.getConnection()); + pConn.setConnection(newConnection()); + pConn.setBusy(false); + } + } + + public synchronized void closeConnectionPool() { + // 确保连接池存在,假如不存在,返回 + if (connections == null) { + System.out.println(" 连接池不存在,无法关闭 !"); + return; + } + PooledConnection pConn = null; + Enumeration enumerate = connections.elements(); + while (enumerate.hasMoreElements()) { + pConn = (PooledConnection) enumerate.nextElement(); + // 假如忙,等 5 秒 + if (pConn.isBusy()) { + wait(5000); // 等 5 秒 + } + // 5 秒后直接关闭它 + closeConnection(pConn.getConnection()); + // 从连接池向量中删除它 + connections.removeElement(pConn); + } + // 置连接池为空 + connections = null; + } + + private void closeConnection(MilvusClient client) { + client.close(); + } + + private void wait(int mSeconds) { + try { + Thread.sleep(mSeconds); + } catch (InterruptedException e) { + } + } + + class PooledConnection { + MilvusClient client = null; // Milvus连接 + boolean busy = false; // 此连接是否正在使用的标志,默认没有正在使用 + + // 构造函数,根据一个 Connection 构告一个 PooledConnection 对象 + public PooledConnection(MilvusClient client) { + this.client = client; + } + + // 返回此对象中的连接 + public MilvusClient getConnection() { + return client; + } + + // 设置此对象的连接 + public void setConnection(MilvusClient client) { + this.client = client; + } + + // 获得对象连接是否忙 + public boolean isBusy() { + return busy; + } + + // 设置对象的连接正在忙 + public void setBusy(boolean busy) { + this.busy = busy; + } + } +} diff --git a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/common/milvus/MilvusConnector.java b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/common/milvus/MilvusConnector.java deleted file mode 100644 index 07d88349..00000000 --- a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/common/milvus/MilvusConnector.java +++ /dev/null @@ -1,37 +0,0 @@ -package me.aias.common.milvus; - -import io.milvus.client.*; -import lombok.extern.slf4j.Slf4j; - -/** - * Milvus Client Connector - * - * @author Calvin - * @date 2021-12-19 - **/ -@Slf4j -public final class MilvusConnector { - - // 创建一个 Milvus 客户端 - private MilvusClient client = null; - - public void init(String host, int port) { - client = new MilvusGrpcClient(); - // Connect to Milvus server - ConnectParam connectParam = new ConnectParam.Builder().withHost(host).withPort(port).build(); - try { - Response connectResponse = client.connect(connectParam); - } catch (ConnectFailedException e) { - log.error("Failed to connect to Milvus server: " + e.toString()); - } - - } - - public void close() throws InterruptedException { - this.client.disconnect(); - } - - public MilvusClient getClient() { - return client; - } -} diff --git a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/config/MilvusConfiguration.java b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/config/MilvusConfiguration.java deleted file mode 100644 index 833eb830..00000000 --- a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/config/MilvusConfiguration.java +++ /dev/null @@ -1,26 +0,0 @@ -package me.aias.config; - -import me.aias.common.milvus.MilvusConnector; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * Milvus配置类 - * @author Calvin - * @date 2021-12-12 - **/ -@Configuration -public class MilvusConfiguration { - @Value("${search.host}") - private String host; - @Value("${search.port}") - private int port; - - @Bean - public MilvusConnector milvusConnector() { - MilvusConnector milvus = new MilvusConnector(); - milvus.init(host, port); - return milvus; - } -} \ No newline at end of file diff --git a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/config/ModelConfiguration.java b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/config/ModelConfiguration.java index 721dfd83..3adaf9ed 100644 --- a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/config/ModelConfiguration.java +++ b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/config/ModelConfiguration.java @@ -14,7 +14,7 @@ import org.springframework.context.annotation.Configuration; @Configuration public class ModelConfiguration { // 设定词汇表的最大量为768 - @Value("${search.dimension}") + @Value("${search.size}") private int size; @Bean diff --git a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/controller/DNASeqController.java b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/controller/DNASeqController.java index 836fd65c..fdd61b0e 100644 --- a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/controller/DNASeqController.java +++ b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/controller/DNASeqController.java @@ -1,7 +1,6 @@ package me.aias.controller; -import io.milvus.client.ConnectFailedException; -import io.milvus.client.HasCollectionResponse; +import io.milvus.param.R; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.RequiredArgsConstructor; @@ -10,13 +9,8 @@ import me.aias.common.sentence.VectorizerModel; import me.aias.common.utils.DataUtils; import me.aias.common.utils.FeatureUtils; import me.aias.common.utils.FileUtils; -import me.aias.config.FileProperties; -import me.aias.domain.DNAInfoDto; -import me.aias.domain.LocalStorage; -import me.aias.domain.ResEnum; -import me.aias.domain.ResultRes; +import me.aias.domain.*; import me.aias.service.DNAService; -import me.aias.service.FeatureService; import me.aias.service.LocalStorageService; import me.aias.service.SearchService; import org.apache.spark.ml.linalg.DenseVector; @@ -52,7 +46,6 @@ import java.util.concurrent.ConcurrentHashMap; @Api(tags = "DNA数据管理") @RequestMapping("/api/text") public class DNASeqController { - private final FileProperties properties; @Autowired private VectorizerModel vectorizerModel; @@ -62,18 +55,9 @@ public class DNASeqController { @Autowired private SearchService searchService; - @Autowired - private FeatureService featureService; - @Autowired private LocalStorageService localStorageService; - @Value("${search.dimension}") - int dimension; - - @Value("${search.collectionName}") - String collectionName; - @ApiOperation(value = "提取DNA特征值") @GetMapping("/extractFeatures") public ResponseEntity extractFeatures(@RequestParam(value = "id") String id) { @@ -107,6 +91,7 @@ public class DNASeqController { // 解析DNA信息 ConcurrentHashMap map = textService.getMap(); long size = map.size(); + int dimension = 0; for (int i = 0; i < rowList.size(); i++) { textInfoDto = new DNAInfoDto(); String label = rowList.get(i).getString(0); @@ -129,21 +114,25 @@ public class DNASeqController { // 将向量插入向量引擎 try { - - HasCollectionResponse response = searchService.hasCollection(this.collectionName); - if (!response.hasCollection()) { - searchService.createCollection(this.collectionName, dimension); - searchService.createIndex(this.collectionName); + R response = searchService.hasCollection(); + if (!response.getData()) { + searchService.initSearchEngine(dimension); } - searchService.insertVectors(this.collectionName, list); + List vectorIds = new ArrayList<>(); + List> vectors = new ArrayList<>(); + for (DNAInfoDto textInfo : list) { + vectorIds.add(textInfo.getId()); + vectors.add(textInfo.getFeature()); + } + searchService.insert(vectorIds, vectors); textService.addTexts(list); - } catch (ConnectFailedException e) { + } catch (Exception e) { e.printStackTrace(); log.error(e.getMessage()); return new ResponseEntity<>(ResultRes.error(ResEnum.MILVUS_CONNECTION_ERROR.KEY, ResEnum.MILVUS_CONNECTION_ERROR.VALUE), HttpStatus.OK); } - return new ResponseEntity<>(ResultRes.success(), HttpStatus.OK); + return new ResponseEntity<>(ResultBean.success(), HttpStatus.OK); } } \ No newline at end of file diff --git a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/controller/SearchController.java b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/controller/SearchController.java index f2d534cb..8af41251 100644 --- a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/controller/SearchController.java +++ b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/controller/SearchController.java @@ -1,8 +1,9 @@ package me.aias.controller; import com.google.common.collect.Lists; -import io.milvus.client.ConnectFailedException; -import io.milvus.client.SearchResponse; +import io.milvus.Response.SearchResultsWrapper; +import io.milvus.grpc.SearchResults; +import io.milvus.param.R; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.RequiredArgsConstructor; @@ -49,9 +50,6 @@ public class SearchController { @Autowired private FeatureService featureService; - @Value("${search.collectionName}") - String collectionName; - @GetMapping("/sequence") @ApiOperation(value = "DNA序列搜索", nickname = "search") public ResultBean search(@RequestParam("sequence") String sequence, @RequestParam(value = "topK") String topk) { @@ -63,8 +61,8 @@ public class SearchController { new StructField("kmers", new ArrayType(DataTypes.StringType, false), false, Metadata.empty()) }); - Long topK = Long.parseLong(topk); - List vectorToSearch = null; + Integer topK = Integer.parseInt(topk); + List vectorToSearch; try { //获取数据 DataFrames List rawData = DataUtils.getRawData("", sequence); @@ -81,50 +79,27 @@ public class SearchController { try { // 根据向量搜索 - SearchResponse searchResponse = searchService.search(this.collectionName, topK, vectorsToSearch); - List> resultIds = searchResponse.getResultIdsList(); - if (resultIds == null || resultIds.size() == 0) { - return ResultBean.failure().add(ResEnum.INFO_NOT_FOUND.KEY, ResEnum.INFO_NOT_FOUND.VALUE); - } - List idList = Lists.transform(resultIds.get(0), (entity) -> { - return entity.toString(); - }); + R searchResponse = searchService.search(topK, vectorsToSearch); + SearchResultsWrapper wrapper = new SearchResultsWrapper(searchResponse.getData().getResults()); + List scores = wrapper.getIDScore(0); // 根据ID获取文本信息 ConcurrentHashMap map = textService.getMap(); List textInfoResList = new ArrayList<>(); - for (String uid : idList) { - Long id = Long.parseLong(uid); - DNAInfoDto dnaInfoDto = map.get(id); + for (SearchResultsWrapper.IDScore score : scores) { + DNAInfoDto dnaInfoDto = map.get(score.getLongID()); DNAInfoRes textInfoRes = new DNAInfoRes(); - Float score = maxScoreForTextId(searchResponse, id); - textInfoRes.setId(id); - textInfoRes.setScore(score); + textInfoRes.setId(score.getLongID()); + textInfoRes.setScore(score.getScore()); textInfoRes.setLabel(dnaInfoDto.getLabel()); textInfoRes.setSequence(dnaInfoDto.getSequence()); textInfoResList.add(textInfoRes); } - return ResultBean.success().add("result", textInfoResList); -// return new ResponseEntity<>(ResultRes.success(textInfoResList, textInfoResList.size()), HttpStatus.OK); - } catch (ConnectFailedException e) { + } catch (Exception e) { e.printStackTrace(); log.error(e.getMessage()); return ResultBean.failure().add(ResEnum.MILVUS_CONNECTION_ERROR.KEY, ResEnum.MILVUS_CONNECTION_ERROR.VALUE); } } - - private Float maxScoreForTextId(SearchResponse searchResponse, Long id) { - float maxScore = -1; - List list = searchResponse.getQueryResultsList().get(0); - for (SearchResponse.QueryResult result : list) { - if (result.getVectorId() == id.longValue()) { - if (result.getDistance() > maxScore) { - maxScore = result.getDistance(); - } - } - } - - return maxScore; - } } diff --git a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/service/SearchService.java b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/service/SearchService.java index 3e4cdeeb..f072eba3 100644 --- a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/service/SearchService.java +++ b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/service/SearchService.java @@ -1,80 +1,71 @@ package me.aias.service; -import io.milvus.client.*; -import me.aias.domain.DNAInfoDto; +import io.milvus.client.MilvusClient; +import io.milvus.grpc.MutationResult; +import io.milvus.grpc.QueryResults; +import io.milvus.grpc.SearchResults; +import io.milvus.param.R; +import io.milvus.param.RpcStatus; +import me.aias.common.milvus.ConnectionPool; import java.util.List; -/** - * 搜索服务接口 - * - * @author Calvin - * @date 2021-12-19 - **/ public interface SearchService { - // 引擎初始化 - void initSearchEngine() throws ConnectFailedException; + // 重置向量引擎 + void clearSearchEngine(); + + // 初始化向量引擎 + void initSearchEngine(Integer dimension); + + // 获取连接池 + ConnectionPool getConnectionPool(boolean refresh); + + // 获取Milvus Client + MilvusClient getClient(ConnectionPool connPool); // 检查是否存在 collection - HasCollectionResponse hasCollection(MilvusClient client, String collectionName); + void returnConnection(ConnectionPool connPool, MilvusClient client); - HasCollectionResponse hasCollection(String collectionName) throws ConnectFailedException; + // 检查是否存在 collection + R hasCollection(MilvusClient milvusClient); + R hasCollection(); // 创建 collection - Response createCollection( - MilvusClient client, String collectionName, long dimension, long indexFileSize); + R createCollection(MilvusClient milvusClient, Integer dimension, long timeoutMiliseconds) ; - Response createCollection(String collectionName, long dimension) throws ConnectFailedException; + // 加载 collection + R loadCollection(MilvusClient milvusClient); + + // 释放 collection + R releaseCollection(MilvusClient milvusClient); // 删除 collection - Response dropCollection(MilvusClient client, String collectionName); + R dropCollection(MilvusClient milvusClient); - // 查看 collection 信息 - Response getCollectionStats(MilvusClient client, String collectionName); + // 创建 分区 + R createPartition(MilvusClient milvusClient, String partitionName); - void insertVectors(String collectionName, Long id, List feature) throws ConnectFailedException; + // 删除 分区 + R dropPartition(MilvusClient milvusClient, String partitionName); - void insertVectors(String collectionName, List vectorIds, List> vectors) throws ConnectFailedException; - - void insertVectors(String collectionName, List list) throws ConnectFailedException; - - InsertResponse insertVectors( - MilvusClient client, String collectionName, List vectorIds, List> vectors); - - // 查询向量数量 - long count(MilvusClient client, String collectionName); - - // 根据ID获取向量 - GetEntityByIDResponse getEntityByID( - MilvusClient client, String collectionName, List vectorIds); - - // 搜索向量 - SearchResponse search(String collectionName, long topK, List> vectorsToSearch) throws ConnectFailedException; - - SearchResponse search( - MilvusClient client, - String collectionName, - int nprobe, - long topK, - List> vectorsToSearch); - - // 删除向量 - Response deleteVectorsByIds(MilvusClient client, String collectionName, List vectorIds); + // 是否存在分区 + R hasPartition(MilvusClient milvusClient, String partitionName); // 创建 index - Response createIndex(MilvusClient client, String collectionName); - - Response createIndex(String collectionName) throws ConnectFailedException; - - // 查看索引信息 - GetIndexInfoResponse getIndexInfo(MilvusClient client, String collectionName); + R createIndex(MilvusClient client); // 删除 index - Response dropIndex(MilvusClient client, String collectionName); + R dropIndex(MilvusClient client); - // 压缩 collection - Response compactCollection(MilvusClient client, String collectionName); + // 插入向量 + R insert(List vectorIds, List> vectors); - // 检查 collection 中是否有 partition "tag" - HasPartitionResponse hasPartition(MilvusClient client, String collectionName, String tag); + // 查询向量 + R query(String expr); + + // 搜索向量 + R search(Integer topK, List> vectorsToSearch); + + // 删除向量 + R delete(String expr); } diff --git a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/service/impl/SearchServiceImpl.java b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/service/impl/SearchServiceImpl.java index bd388ebe..d23f1ea0 100644 --- a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/service/impl/SearchServiceImpl.java +++ b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/service/impl/SearchServiceImpl.java @@ -1,24 +1,37 @@ package me.aias.service.impl; -import com.google.gson.JsonObject; -import io.milvus.client.*; +import io.milvus.Response.QueryResultsWrapper; +import io.milvus.Response.SearchResultsWrapper; +import io.milvus.client.MilvusClient; +import io.milvus.grpc.DataType; +import io.milvus.grpc.MutationResult; +import io.milvus.grpc.QueryResults; +import io.milvus.grpc.SearchResults; +import io.milvus.param.IndexType; +import io.milvus.param.MetricType; +import io.milvus.param.R; +import io.milvus.param.RpcStatus; +import io.milvus.param.collection.*; +import io.milvus.param.dml.DeleteParam; +import io.milvus.param.dml.InsertParam; +import io.milvus.param.dml.QueryParam; +import io.milvus.param.dml.SearchParam; +import io.milvus.param.index.CreateIndexParam; +import io.milvus.param.index.DropIndexParam; +import io.milvus.param.partition.CreatePartitionParam; +import io.milvus.param.partition.DropPartitionParam; +import io.milvus.param.partition.HasPartitionParam; import lombok.extern.slf4j.Slf4j; -import me.aias.common.milvus.MilvusConnector; -import me.aias.domain.DNAInfoDto; +import me.aias.common.milvus.ConnectionPool; import me.aias.service.SearchService; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; -/** - * 搜素服务 - * - * @author Calvin - * @date 2021-12-19 - **/ @Slf4j @Service public class SearchServiceImpl implements SearchService { @@ -26,16 +39,19 @@ public class SearchServiceImpl implements SearchService { String host; @Value("${search.port}") - String port; + String port; - @Value("${search.dimension}") - String dimension; + @Value("${search.indexType}") + String indexType; + + @Value("${search.metricType}") + String metricType; @Value("${search.collectionName}") String collectionName; - @Value("${search.indexFileSize}") - String indexFileSize; + @Value("${search.partitionName}") + String partitionName; @Value("${search.nprobe}") String nprobe; @@ -43,246 +59,344 @@ public class SearchServiceImpl implements SearchService { @Value("${search.nlist}") String nlist; - @Autowired - private MilvusConnector milvusConnector; + private static final String ID_FIELD = "id"; + private static final String VECTOR_FIELD = "feature"; - public void initSearchEngine() { - MilvusClient client = milvusConnector.getClient(); - // 检查 collection 是否存在 - HasCollectionResponse hasCollection = this.hasCollection(client, collectionName); - if (hasCollection.hasCollection()) { - this.dropCollection(client, collectionName); - this.dropIndex(client, collectionName); + // 重置向量引擎 + public void clearSearchEngine() { + ConnectionPool connPool = this.getConnectionPool(false); + MilvusClient client = connPool.getConnection(); + try { + this.releaseCollection(client); + this.dropPartition(client, partitionName); + this.dropIndex(client); + this.dropCollection(client); + } finally { + returnConnection(connPool, client); } } - // 检查是否存在 collection - public HasCollectionResponse hasCollection(MilvusClient client, String collectionName) { - HasCollectionResponse response = client.hasCollection(collectionName); - return response; + // 初始化向量引擎 + public void initSearchEngine(Integer dimension) { + ConnectionPool connPool = this.getConnectionPool(false); + MilvusClient client = connPool.getConnection(); + try { + this.createCollection(client, dimension, 2000); + this.loadCollection(client); + this.createPartition(client, partitionName); + this.createIndex(client); + } finally { + returnConnection(connPool, client); + } + } + + public ConnectionPool getConnectionPool(boolean refresh) { + ConnectionPool connPool = ConnectionPool.getInstance(host, port, refresh); + return connPool; + } + + public MilvusClient getClient(ConnectionPool connPool) { + MilvusClient client = connPool.getConnection(); + return client; + } + + public void returnConnection(ConnectionPool connPool, MilvusClient client) { + // 释放 Milvus client 回连接池 + connPool.returnConnection(client); + // 关闭 Milvus 连接池 + // connPool.closeConnectionPool(); } // 检查是否存在 collection - public HasCollectionResponse hasCollection(String collectionName) { - MilvusClient client = milvusConnector.getClient(); - HasCollectionResponse response = client.hasCollection(collectionName); + public R hasCollection(MilvusClient client) { + R response = client.hasCollection(HasCollectionParam.newBuilder() + .withCollectionName(collectionName) + .build()); return response; } + public R hasCollection() { + ConnectionPool connPool = this.getConnectionPool(false); + MilvusClient client = connPool.getConnection(); + try { + R response = hasCollection(client); + return response; + } finally { + returnConnection(connPool, client); + } + } + // 创建 collection - public Response createCollection( - MilvusClient client, String collectionName, long dimension, long indexFileSize) { - // 选择内积 IP (Inner Product) 作为距离计算方式 MetricType.IP - final MetricType metricType = MetricType.IP; - CollectionMapping collectionMapping = - new CollectionMapping.Builder(collectionName, dimension) - .withIndexFileSize(indexFileSize) - .withMetricType(metricType) - .build(); - Response createCollectionResponse = client.createCollection(collectionMapping); - return createCollectionResponse; + public R createCollection(MilvusClient milvusClient, Integer dimension, long timeoutMiliseconds) { + System.out.println("========== createCollection() =========="); + FieldType fieldType1 = FieldType.newBuilder() + .withName(ID_FIELD) + .withDescription("id") + .withDataType(DataType.Int64) + .withPrimaryKey(true) + .withAutoID(false) // 使用数据库生成的id + .build(); + + FieldType fieldType2 = FieldType.newBuilder() + .withName(VECTOR_FIELD) + .withDescription("embedding") + .withDataType(DataType.FloatVector) + .withDimension(dimension) + .build(); + + + CreateCollectionParam createCollectionReq = CreateCollectionParam.newBuilder() + .withCollectionName(collectionName) + .withDescription("info") + .withShardsNum(2) + .addFieldType(fieldType1) + .addFieldType(fieldType2) + .build(); + R response = milvusClient.withTimeout(timeoutMiliseconds, TimeUnit.MILLISECONDS) + .createCollection(createCollectionReq); + return response; } - public Response createCollection(String collectionName, long dimension) { - // 选择内积 IP (Inner Product) 作为距离计算方式 MetricType.IP - final MetricType metricType = MetricType.IP; - CollectionMapping collectionMapping = - new CollectionMapping.Builder(collectionName, dimension) - .withIndexFileSize(Long.parseLong(indexFileSize)) - .withMetricType(metricType) - .build(); - MilvusClient client = milvusConnector.getClient(); - Response createCollectionResponse = client.createCollection(collectionMapping); - return createCollectionResponse; + // 加载 collection + public R loadCollection(MilvusClient milvusClient) { + System.out.println("========== loadCollection() =========="); + R response = milvusClient.loadCollection(LoadCollectionParam.newBuilder() + .withCollectionName(collectionName) + .build()); + return response; + } + + // 释放 collection + public R releaseCollection(MilvusClient milvusClient) { + System.out.println("========== releaseCollection() =========="); + R response = milvusClient.releaseCollection(ReleaseCollectionParam.newBuilder() + .withCollectionName(collectionName) + .build()); + return response; } // 删除 collection - public Response dropCollection(MilvusClient client, String collectionName) { - // Drop collection - Response dropCollectionResponse = client.dropCollection(collectionName); - return dropCollectionResponse; + public R dropCollection(MilvusClient milvusClient) { + System.out.println("========== dropCollection() =========="); + R response = milvusClient.dropCollection(DropCollectionParam.newBuilder() + .withCollectionName(collectionName) + .build()); + return response; } - // 查看 collection 信息 - public Response getCollectionStats(MilvusClient client, String collectionName) { - Response getCollectionStatsResponse = client.getCollectionStats(collectionName); - // if (getCollectionStatsResponse.ok()) { - // // JSON 格式 collection 信息 - // String jsonString = getCollectionStatsResponse.getMessage(); - // System.out.format("Collection 信息: %s\n", jsonString); - // } - return getCollectionStatsResponse; + // 创建 分区 + public R createPartition(MilvusClient milvusClient, String partitionName) { + System.out.println("========== createPartition() =========="); + R response = milvusClient.createPartition(CreatePartitionParam.newBuilder() + .withCollectionName(collectionName) + .withPartitionName(partitionName) + .build()); + return response; } - // 插入向量 - public void insertVectors(String collectionName, List list) { - MilvusClient client = milvusConnector.getClient(); - List vectorIds = new ArrayList<>(); - List> vectors = new ArrayList<>(); - for (DNAInfoDto textInfo : list) { - vectorIds.add(textInfo.getId()); - vectors.add(textInfo.getFeature()); - } - this.insertVectors(client, collectionName, vectorIds, vectors); + // 删除 分区 + public R dropPartition(MilvusClient milvusClient, String partitionName) { + System.out.println("========== dropPartition() =========="); + R response = milvusClient.dropPartition(DropPartitionParam.newBuilder() + .withCollectionName(collectionName) + .withPartitionName(partitionName) + .build()); + return response; } - // 插入向量 - public void insertVectors(String collectionName, Long id, List feature) { - MilvusClient client = milvusConnector.getClient(); - List vectorIds = new ArrayList<>(); - List> vectors = new ArrayList<>(); - vectorIds.add(id); - vectors.add(feature); - this.insertVectors(client, collectionName, vectorIds, vectors); - } - - // 插入向量 - public void insertVectors(String collectionName, List vectorIds, List> vectors) { - MilvusClient client = milvusConnector.getClient(); - this.insertVectors(client, collectionName, vectorIds, vectors); - } - - public InsertResponse insertVectors( - MilvusClient client, String collectionName, List vectorIds, List> vectors) { - // 需要主动指定ID,如:图片的ID,用来关联图片资源,页面显示使用等 - InsertParam insertParam = - new InsertParam.Builder(collectionName) - .withVectorIds(vectorIds) - .withFloatVectors(vectors) - .build(); - - InsertResponse insertResponse = client.insert(insertParam); - // 返回向量ID列表,向量ID如果不主动赋值,系统自动生成并返回 - // List vectorIds = insertResponse.getVectorIds(); - return insertResponse; - } - - // 刷新数据 - public Response flushData(MilvusClient client, String collectionName) { - // Flush data in collection - Response flushResponse = client.flush(collectionName); - return flushResponse; - } - - // 查询向量数量 - public long count(MilvusClient client, String collectionName) { - // 获取数据条数 - CountEntitiesResponse ountEntitiesResponse = client.countEntities(collectionName); - long rows = ountEntitiesResponse.getCollectionEntityCount(); - return rows; - } - - // 根据ID获取向量 - public GetEntityByIDResponse getEntityByID( - MilvusClient client, String collectionName, List vectorIds) { - - GetEntityByIDResponse getEntityByIDResponse = - client.getEntityByID(collectionName, vectorIds.subList(0, 5)); - return getEntityByIDResponse; - } - - // 搜索向量 - public SearchResponse search(String collectionName, long topK, List> vectorsToSearch) { - // 索引类型不同,参数也可能不同,查询文档选择最优参数 - JsonObject searchParamsJson = new JsonObject(); - searchParamsJson.addProperty("nprobe", Integer.parseInt(nprobe)); - SearchParam searchParam = - new SearchParam.Builder(collectionName) - .withFloatVectors(vectorsToSearch) - .withTopK(topK) - .withParamsInJson(searchParamsJson.toString()) - .build(); - - MilvusClient client = milvusConnector.getClient(); - SearchResponse searchResponse = client.search(searchParam); - return searchResponse; - } - - public SearchResponse search( - MilvusClient client, - String collectionName, - int nprobe, - long topK, - List> vectorsToSearch) { - - // 索引类型不同,参数也可能不同,查询文档选择最优参数 - JsonObject searchParamsJson = new JsonObject(); - searchParamsJson.addProperty("nprobe", nprobe); - SearchParam searchParam = - new SearchParam.Builder(collectionName) - .withFloatVectors(vectorsToSearch) - .withTopK(topK) - .withParamsInJson(searchParamsJson.toString()) - .build(); - SearchResponse searchResponse = client.search(searchParam); - return searchResponse; - } - - // 删除向量 - public Response deleteVectorsByIds( - MilvusClient client, String collectionName, List vectorIds) { - Response deleteByIdsResponse = client.deleteEntityByID(collectionName, vectorIds); - // Flush, 使删除数据生效 - Response flushResponse = client.flush(collectionName); - return deleteByIdsResponse; + // 是否存在分区 + public R hasPartition(MilvusClient milvusClient, String partitionName) { + System.out.println("========== hasPartition() =========="); + R response = milvusClient.hasPartition(HasPartitionParam.newBuilder() + .withCollectionName(collectionName) + .withPartitionName(partitionName) + .build()); + return response; } // 创建 index - public Response createIndex(MilvusClient client, String collectionName) { - // 索引类型在配置页面设置 IndexType. IVFLAT - final IndexType indexType = IndexType.IVF_SQ8; - // 每种索引有自己的可选参数 - 在配置页面设置 - JsonObject indexParamsJson = new JsonObject(); - indexParamsJson.addProperty("nlist", Integer.parseInt(nlist)); - Index index = - new Index.Builder(collectionName, indexType) - .withParamsInJson(indexParamsJson.toString()) - .build(); + public R createIndex(MilvusClient milvusClient) { + System.out.println("========== createIndex() =========="); + String INDEX_PARAM = "{\"nlist\":" + nlist + "}"; - Response createIndexResponse = client.createIndex(index); - return createIndexResponse; - } + IndexType INDEX_TYPE; + switch (indexType.toUpperCase()) { + case "IVF_FLAT": + INDEX_TYPE = IndexType.IVF_FLAT; + break; + case "IVF_SQ8": + INDEX_TYPE = IndexType.IVF_SQ8; + break; + case "IVF_PQ": + INDEX_TYPE = IndexType.IVF_PQ; + break; + case "HNSW": + INDEX_TYPE = IndexType.HNSW; + break; + case "ANNOY": + INDEX_TYPE = IndexType.ANNOY; + break; + case "RHNSW_FLAT": + INDEX_TYPE = IndexType.RHNSW_FLAT; + break; + case "RHNSW_PQ": + INDEX_TYPE = IndexType.RHNSW_PQ; + break; + case "RHNSW_SQ": + INDEX_TYPE = IndexType.RHNSW_SQ; + break; + case "BIN_IVF_FLAT": + INDEX_TYPE = IndexType.BIN_IVF_FLAT; + break; + default: + INDEX_TYPE = IndexType.IVF_FLAT; + break; + } - public Response createIndex(String collectionName) { - // 索引类型在配置页面设置 IndexType.IVF_SQ8 IVFLAT - final IndexType indexType = IndexType.IVF_SQ8; - // 每种索引有自己的可选参数 - 在配置页面设置 - JsonObject indexParamsJson = new JsonObject(); - indexParamsJson.addProperty("nlist", Integer.parseInt(nlist)); - Index index = - new Index.Builder(collectionName, indexType) - .withParamsInJson(indexParamsJson.toString()) - .build(); - MilvusClient client = milvusConnector.getClient(); - Response createIndexResponse = client.createIndex(index); - return createIndexResponse; - } - - // 查看索引信息 - public GetIndexInfoResponse getIndexInfo(MilvusClient client, String collectionName) { - GetIndexInfoResponse getIndexInfoResponse = client.getIndexInfo(collectionName); - // System.out.format("索引信息: %s\n",search.service.SearchServiceImpl.getIndexInfo(client, - // collectionName).getIndex().toString()); - return getIndexInfoResponse; + MetricType METRIC_TYPE; + MetricType metricTypeEnum = MetricType.valueOf(metricType.toUpperCase()); + switch (metricTypeEnum) { + case L2: + METRIC_TYPE = MetricType.L2; + break; + case IP: + METRIC_TYPE = MetricType.IP; + break; + case HAMMING: + METRIC_TYPE = MetricType.HAMMING; + break; + case JACCARD: + METRIC_TYPE = MetricType.JACCARD; + break; + case TANIMOTO: + METRIC_TYPE = MetricType.TANIMOTO; + break; + case SUBSTRUCTURE: + METRIC_TYPE = MetricType.SUBSTRUCTURE; + break; + case SUPERSTRUCTURE: + METRIC_TYPE = MetricType.SUPERSTRUCTURE; + break; + default: + METRIC_TYPE = MetricType.L2; + break; + } + R response = milvusClient.createIndex(CreateIndexParam.newBuilder() + .withCollectionName(collectionName) + .withFieldName(VECTOR_FIELD) + .withIndexType(INDEX_TYPE) + .withMetricType(METRIC_TYPE) + .withExtraParam(INDEX_PARAM) + .withSyncMode(Boolean.TRUE) + .build()); + return response; } // 删除 index - public Response dropIndex(MilvusClient client, String collectionName) { - Response dropIndexResponse = client.dropIndex(collectionName); - return dropIndexResponse; + public R dropIndex(MilvusClient milvusClient) { + System.out.println("========== dropIndex() =========="); + R response = milvusClient.dropIndex(DropIndexParam.newBuilder() + .withCollectionName(collectionName) + .withFieldName(VECTOR_FIELD) + .build()); + return response; } - // 压缩 collection - public Response compactCollection(MilvusClient client, String collectionName) { - // 压缩 collection, 从磁盘抹除删除的数据,并在后台重建索引(如果压缩后的数据比indexFileSize还要大) - // 在主动压缩前,数据只是软删除 - Response compactResponse = client.compact(collectionName); - return compactResponse; + // 插入向量 + public R insert(List vectorIds, List> vectors) { + System.out.println("========== insert() =========="); + ConnectionPool connPool = this.getConnectionPool(false); + MilvusClient milvusClient = connPool.getConnection(); + try { + List fields = new ArrayList<>(); + fields.add(new InsertParam.Field(ID_FIELD, DataType.Int64, vectorIds)); + fields.add(new InsertParam.Field(VECTOR_FIELD, DataType.FloatVector, vectors)); + + InsertParam insertParam = InsertParam.newBuilder() + .withCollectionName(collectionName) + .withPartitionName(partitionName) + .withFields(fields) + .build(); + R response = milvusClient.insert(insertParam); + return response; + } finally { + returnConnection(connPool, milvusClient); + } } - // 检查 collection 中是否有 partition "tag" - public HasPartitionResponse hasPartition(MilvusClient client, String collectionName, String tag) { - HasPartitionResponse hasPartitionResponse = client.hasPartition(collectionName, tag); - return hasPartitionResponse; + // 查询向量 + // queryExpr = ID_FIELD + " == 60"; + public R query(String expr) { + System.out.println("========== query() =========="); + ConnectionPool connPool = this.getConnectionPool(false); + MilvusClient milvusClient = connPool.getConnection(); + try { + List fields = Arrays.asList(ID_FIELD); + QueryParam test = QueryParam.newBuilder() + .withCollectionName(collectionName) + .withExpr(expr) + .withOutFields(fields) + .build(); + R response = milvusClient.query(test); + QueryResultsWrapper wrapper = new QueryResultsWrapper(response.getData()); + System.out.println(ID_FIELD + ":" + wrapper.getFieldWrapper(ID_FIELD).getFieldData().toString()); + System.out.println("Query row count: " + wrapper.getFieldWrapper(ID_FIELD).getRowCount()); + return response; + } finally { + returnConnection(connPool, milvusClient); + } + } + + // 搜索向量 + public R search(Integer topK, List> vectorsToSearch) { + System.out.println("========== searchImage() =========="); + ConnectionPool connPool = this.getConnectionPool(false); + MilvusClient milvusClient = connPool.getConnection(); + try { + String SEARCH_PARAM = "{\"nprobe\":" + nprobe + "}"; + + List outFields = Arrays.asList(ID_FIELD); + SearchParam searchParam = SearchParam.newBuilder() + .withCollectionName(collectionName) + .withMetricType(MetricType.IP) + .withOutFields(outFields) + .withTopK(topK) + .withVectors(vectorsToSearch) + .withVectorFieldName(VECTOR_FIELD) +// .withExpr(expr) + .withParams(SEARCH_PARAM) + .build(); + + R response = milvusClient.search(searchParam); + SearchResultsWrapper wrapper = new SearchResultsWrapper(response.getData().getResults()); + for (int i = 0; i < vectorsToSearch.size(); ++i) { + System.out.println("Search result of No." + i); + List scores = wrapper.getIDScore(i); + System.out.println(scores); + System.out.println("Output field data for No." + i); + } + return response; + } finally { + returnConnection(connPool, milvusClient); + } + } + + // 删除向量 + // String deleteExpr = ID_FIELD + " in " + deleteIds.toString(); + public R delete(String expr) { + System.out.println("========== delete() =========="); + ConnectionPool connPool = this.getConnectionPool(false); + MilvusClient milvusClient = connPool.getConnection(); + try { + DeleteParam build = DeleteParam.newBuilder() + .withCollectionName(collectionName) + .withPartitionName(partitionName) + .withExpr(expr) + .build(); + R response = milvusClient.delete(build); + return response; + } finally { + returnConnection(connPool, milvusClient); + } } } diff --git a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/tools/MilvusInit.java b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/tools/MilvusInit.java index b422b69c..2a8c54e0 100644 --- a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/tools/MilvusInit.java +++ b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/java/me/aias/tools/MilvusInit.java @@ -1,6 +1,14 @@ package me.aias.tools; -import io.milvus.client.*; +import io.milvus.client.MilvusServiceClient; +import io.milvus.param.ConnectParam; +import io.milvus.param.R; +import io.milvus.param.RpcStatus; +import io.milvus.param.collection.DropCollectionParam; +import io.milvus.param.collection.HasCollectionParam; +import io.milvus.param.collection.ReleaseCollectionParam; +import io.milvus.param.index.DropIndexParam; +import io.milvus.param.partition.DropPartitionParam; /** * 搜索引擎初始化工具 @@ -9,49 +17,92 @@ import io.milvus.client.*; * @date 2021-12-12 **/ public class MilvusInit { + private static final MilvusServiceClient milvusClient; - public static void main(String[] args) throws InterruptedException { + static { + ConnectParam connectParam = ConnectParam.newBuilder() + .withHost("127.0.0.1") + .withPort(19530) + .build(); + milvusClient = new MilvusServiceClient(connectParam); + } - String host = "127.0.0.1"; - int port = 19530; - final String collectionName = "dna"; // collection name + private static final String COLLECTION_NAME = "dna";// collection name + private static final String VECTOR_FIELD = "feature"; - MilvusClient client = new MilvusGrpcClient(); - // Connect to Milvus server - ConnectParam connectParam = new ConnectParam.Builder().withHost(host).withPort(port).build(); + public static void main(String[] args) { try { - Response connectResponse = client.connect(connectParam); - } catch (ConnectFailedException e) { + // 检查 collection 是否存在,不存在会抛异常 + hasCollection(); + releaseCollection(); + dropPartition("p1"); + dropIndex(); + dropCollection(); + + } catch (Exception e) { e.printStackTrace(); } - - // 检查 collection 是否存在 - HasCollectionResponse hasCollection = hasCollection(client, collectionName); - if (hasCollection.hasCollection()) { - dropIndex(client, collectionName); - dropCollection(client, collectionName); - } - // 关闭 Milvus 连接 - client.disconnect(); + milvusClient.close(); } // 检查是否存在 collection - public static HasCollectionResponse hasCollection(MilvusClient client, String collectionName) { - HasCollectionResponse response = client.hasCollection(collectionName); + private static R hasCollection() { + System.out.println("========== hasCollection() =========="); + R response = milvusClient.hasCollection(HasCollectionParam.newBuilder() + .withCollectionName(COLLECTION_NAME) + .build()); + handleResponseStatus(response); + System.out.println(response); return response; } // 删除 collection - public static Response dropCollection(MilvusClient client, String collectionName) { - // Drop collection - Response dropCollectionResponse = client.dropCollection(collectionName); - return dropCollectionResponse; + private static R dropCollection() { + System.out.println("========== dropCollection() =========="); + R response = milvusClient.dropCollection(DropCollectionParam.newBuilder() + .withCollectionName(COLLECTION_NAME) + .build()); + System.out.println(response); + return response; + } + + private static R releaseCollection() { + System.out.println("========== releaseCollection() =========="); + R response = milvusClient.releaseCollection(ReleaseCollectionParam.newBuilder() + .withCollectionName(COLLECTION_NAME) + .build()); + handleResponseStatus(response); + System.out.println(response); + return response; } // 删除 index - public static Response dropIndex(MilvusClient client, String collectionName) { - Response dropIndexResponse = client.dropIndex(collectionName); - return dropIndexResponse; + private static R dropIndex() { + System.out.println("========== dropIndex() =========="); + R response = milvusClient.dropIndex(DropIndexParam.newBuilder() + .withCollectionName(COLLECTION_NAME) + .withFieldName(VECTOR_FIELD) + .build()); + handleResponseStatus(response); + System.out.println(response); + return response; + } + + private static R dropPartition(String partitionName) { + System.out.println("========== dropPartition() =========="); + R response = milvusClient.dropPartition(DropPartitionParam.newBuilder() + .withCollectionName(COLLECTION_NAME) + .withPartitionName(partitionName) + .build()); + handleResponseStatus(response); + System.out.println(response); + return response; + } + + private static void handleResponseStatus(R r) { + if (r.getStatus() != R.Status.Success.getCode()) { + throw new RuntimeException(r.getMessage()); + } } } diff --git a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/resources/application.yml b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/resources/application.yml index 70e6cf35..cf5fb4aa 100644 --- a/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/resources/application.yml +++ b/6_biomedicine/dna_sequence_search/dna_sequence_search/src/main/resources/application.yml @@ -23,12 +23,30 @@ file: # 文件大小 /M maxSize: 3000 -##################### 向量引擎 ############################### +# Vector Engine - Milvus search: host: 127.0.0.1 port: 19530 - indexFileSize: 1024 # maximum size (in MB) of each index file - nprobe: 16 + + # https://milvus.io/cn/docs/v2.0.0/build_index.md + # https://milvus.io/cn/docs/v2.0.0/metric.md#floating + # For floating point vectors: + # IVF_FLAT + # IVF_SQ8 + # IVF_PQ + # HNSW + # ANNOY + # RHNSW_FLAT + # RHNSW_PQ + # RHNSW_SQ + # For binary vectors: + # BIN_IVF_FLAT + indexType: IVF_FLAT + # 选择内积 IP (Inner Product) 作为距离计算方式 MetricType.IP + metricType: IP + + nprobe: 256 nlist: 16384 - dimension: 768 #dimension of each vector - collectionName: dna #collection name \ No newline at end of file + size: 768 # 设定词汇表的最大量为768 + collectionName: dna #collection name + partitionName: p1 \ No newline at end of file diff --git a/6_biomedicine/molecular_search/molecular-search/molecular-search.iml b/6_biomedicine/molecular_search/molecular-search/molecular-search.iml deleted file mode 100644 index 8c652311..00000000 --- a/6_biomedicine/molecular_search/molecular-search/molecular-search.iml +++ /dev/null @@ -1,207 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file