向量数据库(Mivus)学习指南及简单操作

github代码icon-default.png?t=N7T8https://github.com/Xiaomkuaipao/Milvus

b站讲解icon-default.png?t=N7T8https://space.bilibili.com/430576513

前记:

  1. Milvus和Weaviate-client 这俩库有冲突,单开两个环境;

  2. 本次测试中,Milvus和Weaviate在查询精度上完全相同;

  3. 小数据集搜索中,参数设置相同时,Milvus的查询时间是Weaviate的三倍左右,即Weaviate更快;

  4. Milvus在介绍中表明,在处理大规模数据集的向量搜索具有高性能,本次测试时间可能和数据集小有关;

Milvus

一、参考链接

Milvus中文文档

Milvus官方文档

集群使用(知乎)

非常详细的介绍CSDN

二、概念

2.1 优缺点(个人观点)

1. 优点
  • 操作相对简单

  • 强可视化

2. 缺点
  • 查询速度慢:数据量少的情况下,用时是Weaviate的3倍多(156条问答对,312次查询)

2.2 设计原理

作为云原生向量数据库,Milvus的设计通过分离存储与计算来实现。为了增强弹性和灵活性,Milvus中的所有组件都是无状态的。

系统分为四个层次:

  • 访问层:访问层由一组无状态代理组成,作为系统的前层和用户端点。

  • 协调器服务:协调器服务将任务分配给工作节点,并充当系统的大脑。

  • 工作节点:工作节点是系统的手臂和腿部,是执行来自协调器服务的指令并执行用户触发的DML/DDL命令的“哑执行者”。

  • 存储:存储是系统的骨头,负责数据持久化。它包括元数据存储、日志代理和对象存储。

2.3 名词概念

  • 集合(Collection):可以包含一个或多个Segment

  • 段(Segment):一个Segment相当于是一个表

  • 实体(Entity):相当于是一个字段(行),Milvus不支持主键去重,可能会存在重复的主键

  • 字段(Field): 组成实体的单元

  • 集群(cluster):实现高可用性和易扩展性

  • Schema:定义数据类型和数据属性的元信息。定义集合的字段、启用自动 ID(主键)分配,并包括集合描述。集合 schema 中还包括定义字段名称、数据类型和其他属性的字段模式。(定义方式比Weaviate简单好多)

  • 向量相似度搜索:通常采用近似最近邻算法(ANN)搜索,Weaviate中也是

2.4 索引类型[测试时用的IVF_FLAT,试一下别的方式]

大多数由Milvus支持的向量索引类型使用近似最近邻搜索(ANNS),包括:

  • FLAT:FLAT最适合于在小规模,百万级数据集上寻求完全准确和精确的搜索结果的场景。

  • IVF_FLAT:IVF_FLAT是一种量化索引,最适合于在精度和查询速度之间寻求理想平衡的场景。

  • IVF_SQ8:IVF_SQ8是一种量化索引,最适合于在磁盘、CPU和GPU内存消耗非常有限的场景中显著减少资源消耗。

  • IVF_PQ:IVF_PQ是一种量化索引,最适合于在高查询速度的情况下以牺牲精度为代价的场景。

  • HNSW:HNSW是一种基于图形的索引,最适合于对搜索效率有很高需求的场景。

  • ANNOY:ANNOY是一种基于树形结构的索引,最适合于寻求高召回率的场景。

2.5 相似度度量

在 Milvus 中,相似度度量用于衡量向量之间的相似性。选择一个好的距离度量方法可以显著提高分类和聚类的性能。根据输入数据的形式,选择特定的相似度度量方法可以获得最优的性能。

对于浮点嵌入,通常使用以下指标:

  • 欧氏距离(L2):该指标通常用于计算机视觉领域(CV)。

  • 内积(IP):该指标通常用于自然语言处理领域(NLP)。

在二元嵌入中广泛使用的度量标准包括:

  • 哈明距离:这个度量标准通常用于自然语言处理(NLP)领域。

  • 杰卡德距离:这个度量标准通常用于分子相似性搜索领域。

  • 塔尼莫托距离:这个度量标准通常用于分子相似性搜索领域。

  • 超结构距离:这个度量标准通常用于搜索分子的类似超结构。

  • 亚结构距离:这个度量标准通常用于搜索分子的类似亚结构。

三、安装和使用

3.1 安装

  • milvus

  • 我安装时候参考的链接,一次成功

  • 单机版的docker安装方法(官方链接)

wget https://github.com/milvus-io/milvus/releases/download/v2.3.4/milvus-standalone-docker-compose.yml -O docker-compose.yml
  • 图形化界面安装

  • 软件位置,下载attu-Setup-***.exe直接点击下一步进行安装即可

3.2 使用

其中,第1、2、3步是固定顺序,做完才能进行其他操作。

1. 先连接上数据库

# 先连接上数据库,连接到本地ip 127.0.0.1 
from pymilvus import (connections, utility, FieldSchema, CollectionSchema, DataType, Collection) 
connections.connect("default", host="localhost", port="19530", db_name='KnowledgeBase')

2. 创建表

from pymilvus import (
    connections,
    utility,
    FieldSchema, CollectionSchema, DataType,
    Collection, loading_progress,
)

# 本地连接ip 127.0.0.1
connections.connect("default", host="localhost", port="19530", db_name='KnowledgeBase')

fields = [
    FieldSchema(name="uid", dtype=DataType.INT64, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="Question", dtype=DataType.VARCHAR,max_length=5000),
    FieldSchema(name="embeddings_Q", dtype=DataType.FLOAT_VECTOR, dim=768),
    FieldSchema(name="Answer", dtype=DataType.VARCHAR,max_length=5000)
]
# 表的field块以及对表的描述
schema = CollectionSchema(fields, "存储向量数据") 

# 表名,Strong是只要更新用户就能看到
docs_data = Collection("Milvus_Test", schema, consistency_level="Strong")  

3. 创建Index

"""
为attribute_name创建索引
:param Collection_Name:连接的表名,即段(Segment)名
:param attribute_name: 加入索引的属性名,即字段(Field)名, 列名。
:param index: 索引的属性
"""
def Index_create(Collection_Name: str, attribute_name: str, index: dict):
    try:
        collection = Collection(Collection_Name)
        collection.create_index(field_name=attribute_name, index_params=index)
    except Exception as e:
        print(e)
    else:
        print("Index插入成功")    
    
index = {
    "index_type": "IVF_FLAT",  # 适合于在精度和查询速度之间寻求理想平衡的场景,试一试别的索引方式,再查一下
    "metric_type": "L2",  # 距离,这个更适合图像,后面再试试其它的
    "params": {"nlist": 128},  # 倒排文件中创建128个倒排列表(inverted lists),大nlist值有助于提高搜索速度,但会增加索引的内存消耗
}
Index_create("Milvus_Test", "embeddings_Q", index)

4. 插入数据

# vector.py
def Insert_vector(id_list: list, Q_list: list, A_list: list, embeddings_Q, Collection):
    """
    id_list:key list
    Q_list:Question list
    A_list:Answer list
    embeddings_Q: Question embedding list
    Collection: 建立好的连接
    """
    try:
        collection = Collection

        for id_key, sentence_Q, embedding_Q, sentence_A in zip(id_list, Q_list, embeddings_Q, A_list):
            # sentence:原文
            # embedding:转换后的向量
            entities = [[id_key], [sentence_Q], [embedding_Q], [sentence_A]]
            collection.insert(entities)
            print(f"编号{id_key}插入完毕")
        collection.flush()

    except Exception as e:
        print(e)
    
    
# InsertVector.py    
import time
import pandas as pd
from pymilvus import (connections, utility, FieldSchema, CollectionSchema, DataType, Collection)
from langchain_community.embeddings import HuggingFaceEmbeddings


csv_file_path = r'D:\TTShixi\Code\MilvusTest\pythonProject\MilvusTest\QA_Pair.csv'

model = HuggingFaceEmbeddings(model_name="moka-ai/m3e-base", model_kwargs={"device": "cpu"})

Collection_Name = 'Milvus_Test'
collection = Collection(Collection_Name)

# 读取数据
df = pd.read_csv(csv_file_path, index_col=0)

# 分配数据 list
sentenceQ_data = df.Q.values
sentenceA_data = df.A.values
id_list = df.index

# 计算需要的embedding列的list
sentenceQ_embeddings = model.embed_documents(sentenceQ_data)

start_time = time.time()
Insert_vector(id_list, sentenceQ_data, sentenceA_data, sentenceQ_embeddings, Collection=collection)
end_time = time.time()
print(f"插入数据用时为: {end_time - start_time}秒, 平均一条的插入时间为: {(end_time - start_time)/len(id_list)}")

5. 搜索

# vector.py
def Similarity_search(collection, Query_embeding, anns_field, k, search_params) -> list:
    """
    全库检索
    :param Query: 问题
    :param Collection_Name: 表名
    :param k: 保留结果数
    :param anns_field: 要搜索的field
    :return:
    """
    try:
        result = collection.search([Query_embeding], anns_field=anns_field, param=search_params, limit=k, output_fields=["uid", "Question", "Answer"])
        # 可以拿result[0].distance来限制距离,给一个临界值,超过之后才能匹配上
        return result[0][0].fields['uid']

    except Exception as e:
        result = '程序异常中断,Error'
        print(result)
        
        
        
# Similarity_Search.py
import time
# from vector import Similarity_search
from langchain.embeddings import HuggingFaceEmbeddings
import pandas as pd
from pymilvus import (connections, utility, FieldSchema, CollectionSchema, DataType, Collection)

csv_file_path = r'D:\TTShixi\Code\MilvusTest\pythonProject\vectorstores\Weaviate_Test\QA_上海证券交易所.csv'

anns_field = "embeddings_Q"

model = HuggingFaceEmbeddings(model_name="moka-ai/m3e-base", model_kwargs={"device": "cpu"})
Collection_Name = 'Milvus_Test'
collection = Collection(Collection_Name)
# collection.load()

df = pd.read_csv(csv_file_path, index_col=0)
sentence_data_QS1 = df.QS1.values
sentence_data_QS2 = df.QS2.values

num_all = len(sentence_data_QS1)
num_cor = 0

search_params = {
    "metric_type": "L2",  # 设置距离度量方式为L2
    "params": {"nprobe": 10},  # 从倒排列表中查找 10 个最相近的候选项进行精确的距离计算
}

start = time.time()
# i应该改为df的行号
for i in range(num_all):
    query = model.embed_documents([sentence_data_QS1[i]])[0]
    if (i+1) == Similarity_search(collection, query, anns_field=anns_field, k=1, search_params=search_params):
        num_cor += 1
    query = model.embed_documents([sentence_data_QS2[i]])[0]
    if (i+1) == Similarity_search(collection, query, anns_field=anns_field, k=1, search_params=search_params):
        num_cor += 1
    print(f"测到第{i+1}个了")

end = time.time()

print(f"查询时间为:{end-start}秒,平均每条的查询时间为{(end-start)/num_all/2}秒")
print(f"正确率为{num_cor/num_all/2*100}%")

6. 删除实体

# vector.py
def Delete_Entity(Collection, id: int) -> bool:
    """
    删除表里的某一行
    :param Collection: 连接过的直接传过来
    :param id: key值
    :return: 布尔值
    也可以通过匹配问题,然后找到这个问题的uid之后再删,后续可以改
    """
    try:
        collection = Collection
        expr = f"uid in [{id}]"
        collection.delete(expr)  # 可以指定分区
        return True

    except Exception as e:
        print(e)
        return False
        
# Entity_delete.py
# from vector import Delete_Entity
from pymilvus import (connections, utility, FieldSchema, CollectionSchema, DataType, Collection)
collection = Collection('Milvus_Test')
for i in range(156):
    de = Delete_Entity(Collection=collection, id=i)
    if de:
        print(f"第{i}条数据删除成功")
    else:
        print(f"第{i}条数据删除失败")
collection.flush()

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/9cbc3bf5db.html