CH3 搭建数据库 - SunXiaoXiang/llm-universe GitHub Wiki
词向量及向量知识库
词向量
词向量(Embeddings)是一种将非结构化数据,如单词、句子或者整个文档,转化为实数向量的技术。 词嵌入(word embeddings)来表示文本数据。在词嵌入中,每个单词被转换为一个向量,这个向量捕获了这个单词的语义信息。
词向量的优势
在RAG(Retrieval Augmented Generation,检索增强生成)方面词向量的优势主要有两点:
- 词向量比文字更适合检索。当我们在数据库检索时,如果数据库存储的是文字,主要通过检索关键词(词法搜索)等方法找到相对匹配的数据,匹配的程度是取决于关键词的数量或者是否完全匹配查询句的;但是词向量中包含了原文本的语义信息,可以通过计算问题与数据库中数据的点积、余弦距离、欧几里得距离等指标,直接获取问题与数据在语义层面上的相似度;
- 词向量比其它媒介的综合信息能力更强,当传统数据库存储文字、声音、图像、视频等多种媒介时,很难去将上述多种媒介构建起关联与跨模态的查询方法;但是词向量却可以通过多种向量模型将多种数据映射成统一的向量形式。
一般构建词向量的方法
搭建 RAG 系统时,我们往往可以通过使用嵌入模型来构建词向量,我们可以选择:
- 使用各个公司的 Embedding API;
- 在本地使用嵌入模型将数据构建为词向量。
向量数据库
什么是向量数据库
向量数据库是一种专门用于存储和检索向量数据(embedding)的数据库系统。它与传统的基于关系模型的数据库不同,它主要关注的是向量数据的特性和相似性。 在向量数据库中,数据被表示为向量形式,每个向量代表一个数据项。这些向量可以是数字、文本、图像或其他类型的数据。向量数据库使用高效的索引和查询算法来加速向量数据的存储和检索过程。
向量数据库的原理及核心优势
向量数据库中的数据以向量作为基本单位,对向量进行存储、处理及检索。向量数据库通过计算与目标向量的余弦距离、点积等获取与目标向量的相似度。当处理大量甚至海量的向量数据时,向量数据库索引和查询算法的效率明显高于传统数据库。
主流的向量数据库
Chroma : https://www.trychroma.com/
一个轻量级向量数据库,拥有丰富的功能和简单的 API,具有简单、易用、轻量的优点,但功能相对简单且不支持GPU加速,适合初学者使用。
Weaviate: https://weaviate.io/
一个开源向量数据库。除了支持相似度搜索和最大边际相关性(MMR,Maximal Marginal Relevance)搜索外还可以支持结合多种搜索算法(基于词法搜索、向量搜索)的混合搜索,从而搜索提高结果的相关性和准确性。
Qdrant: https://qdrant.tech/
Qdrant使用 Rust 语言开发,有极高的检索效率和RPS(Requests Per Second),支持本地运行、部署在本地服务器及Qdrant云三种部署模式。且可以通过为页面内容和元数据制定不同的键来复用数据。
使用Embedding API
使用OpenAI API
因没有openai的付费账号,以下代码未运行 GPT embedding mode 有三种 text-embedding-3-large 最好的性能和最贵的价格,当我们搭建的应用需要更好的表现且成本充足的情况下可以使用 text-embedding-3-small 有着较好的性能跟价格,当我们预算有限时可以选择该模型 text-embedding-3-ada-002 是OpenAI上一代的模型,无论在性能还是价格都不如及前两者,因此不推荐使用。
import os
from openai import OpenAI
from dotenv import load_dotenv, find_dotenv
# 读取本地/项目的环境变量。
# find_dotenv()寻找并定位.env文件的路径
# load_dotenv()读取该.env文件,并将其中的环境变量加载到当前的运行环境中
# 如果你设置的是全局的环境变量,这行代码则没有任何作用。
_ = load_dotenv(find_dotenv())
# 如果你需要通过代理端口访问,你需要如下配置
os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:7890'
os.environ["HTTP_PROXY"] = 'http://127.0.0.1:7890'
def openai_embedding(text: str, model: str=None):
# 获取环境变量 OPENAI_API_KEY
api_key=os.environ['OPENAI_API_KEY']
client = OpenAI(api_key=api_key)
# embedding model:'text-embedding-3-small', 'text-embedding-3-large', 'text-embedding-ada-002'
if model == None:
model="text-embedding-3-small"
response = client.embeddings.create(
input=text,
model=model
)
return response
response = openai_embedding(text='要生成 embedding 的输入文本,字符串形式。')
API返回的数据为json
格式,除object
向量类型外还有存放数据的data
、embedding model 型号model
以及本次 token 使用情况usage
等数据,具体如下所示
{
"object": "list",
"data": [
{
"object": "embedding",
"index": 0,
"embedding": [
-0.006929283495992422,
... (省略)
-4.547132266452536e-05,
],
}
],
"model": "text-embedding-3-small",
"usage": {
"prompt_tokens": 5,
"total_tokens": 5
}
}
我们可以调用response的object来获取embedding的类型。
print(f'返回的embedding类型为:{response.object}')
文心千帆API
import os
import requests
import json
from dotenv import load_dotenv, find_dotenv
# 读取本地/项目的环境变量。
# find_dotenv() 寻找并定位 .env 文件的路径
# load_dotenv() 读取该 .env 文件,并将其中的环境变量加载到当前的运行环境中
# 如果你设置的是全局的环境变量,这行代码则没有任何作用。
_ = load_dotenv(find_dotenv())
def wenxin_embedding(text: str):
# 获取环境变量 wenxin_api_key、wenxin_secret_key
api_key = os.environ['QIANFAN_AK']
secret_key = os.environ['QIANFAN_SK']
# 使用API Key、Secret Key向https://aip.baidubce.com/oauth/2.0/token 获取Access token
url = "https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={0}&client_secret={1}".format(api_key, secret_key)
payload = json.dumps("")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
# 通过获取的Access token 来embedding text
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/embeddings/embedding-v1?access_token=" + str(response.json().get("access_token"))
input = []
input.append(text)
payload = json.dumps({
"input": input
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
return json.loads(response.text)
# text应为List(string)
text = "要生成 embedding 的输入文本,字符串形式。"
response = wenxin_embedding(text=text)
print('本次embedding id为:{}'.format(response['id']))
print('本次embedding产生时间戳为:{}'.format(response['created']))
print('返回的embedding类型为:{}'.format(response['object']))
print('embedding长度为:{}'.format(len(response['data'][0]['embedding'])))
print('embedding(前10)为:{}'.format(response['data'][0]['embedding'][:10]))
备注:没有千帆大模型的付费api,上述代码Error Code 17,Error MSG:Open api daily request limit reached。详情是:每天请求量超限额,已上线计费的接口,请直接在控制台开通计费,调用量不受限制,按调用量阶梯计费;未上线计费的接口,请在百度云控制台内提交工单反馈。 这部分在千帆大模型的调试界面也一样,无法验证。等充值后再测试。
智谱API
from zhipuai import ZhipuAI
def zhipu_embedding(text: str):
api_key = os.environ['ZHIPUAI_API_KEY']
client = ZhipuAI(api_key=api_key)
response = client.embeddings.create(
model="embedding-2",
input=text,
)
return response
text = '要生成 embedding 的输入文本,字符串形式。'
response = zhipu_embedding(text=text)
response为zhipuai.types.embeddings.EmbeddingsResponded
类型,我们可以调用object
、data
、model
、usage
来查看response的embedding类型、embedding、embedding model及使用情况。
print(f'response类型为:{type(response)}')
print(f'embedding类型为:{response.object}')
print(f'生成embedding的model为:{response.model}')
print(f'生成的embedding长度为:{len(response.data[0].embedding)}')
print(f'embedding(前10)为: {response.data[0].embedding[:10]}')
备注:智谱送的token已经过期了,上述代码并没有验证。
Azure OpenAI
参考Azure 的官方Demo,结合教材OpenAI的例子 ,做了Azure OpenAI的尝试 Azure的例子 地址: https://learn.microsoft.com/zh-cn/azure/ai-services/openai/how-to/embeddings?tabs=python-new
import os
from openai import AzureOpenAI
client = AzureOpenAI(
api_key = os.getenv("AZURE_OPENAI_API_KEY"),
api_version = "2024-02-01",
azure_endpoint =os.getenv("AZURE_OPENAI_ENDPOINT")
)
response = client.embeddings.create(
input = "Your text string goes here",
model= "text-embedding-ada-002"
)
print(response.model_dump_json(indent=2))
将api_key等均从env配置文件中读取,并把代码重构后,上述代码改为
import os
from openai import AzureOpenAI
from dotenv import load_dotenv, find_dotenv
from pprint import pprint
# 读取本地/项目的环境变量。
# find_dotenv()寻找并定位.env文件的路径
# load_dotenv()读取该.env文件,并将其中的环境变量加载到当前的运行环境中
# 如果你设置的是全局的环境变量,这行代码则没有任何作用。
_ = load_dotenv(find_dotenv())
def azure_openai_embedding(text: str, model: str=None):
api_key = os.getenv("AZURE_OPENAI_API_KEY")
api_version = os.environ['API_VERSION']
azure_endpoint =os.getenv("AZURE_OPENAI_ENDPOINT")
client = AzureOpenAI(api_key=api_key, api_version=api_version, azure_endpoint=azure_endpoint)
if model == None:
model = "text-embedding-ada-002"
response = client.embeddings.create(
input=text,
model=model
)
return response
response = azure_openai_embedding(text="这段是需要embedding的文本。", model="text-embedding-3-small")
print(response.model_dump_json(indent=2))
response = azure_openai_embedding(text="这段是需要embedding的文本。", model="text-embedding-3-large")
print(response.model_dump_json(indent=2))
response = azure_openai_embedding(text="这段是需要embedding的文本。", model="text-embedding-ada-002")
print(response.model_dump_json(indent=2))
print(f'返回的embedding类型为:{response.object}')
print(f'embedding长度为:{len(response.data[0].embedding)}')
print(f'embedding(前10)为:{response.data[0].embedding[:10]}')
print(f'本次embedding model为:{response.model}')
print(f'本次token使用情况为:{response.usage}')
实际跑过一次后发现 3个接口model的,small接口 embedding 长度1536,消耗token=9,large接口embedding 3072,消耗token=9,ada-002接口 embedding 长度1536,消耗token=9。可能embedding文本太短,并未有速度上的差异。
数据处理
教材中使用了部分Datawhale的开源课程作为处理的素材 知识库源数据放置在../data_base/knowledge_db 目录下
数据读取
PDF文档
使用 LangChain 的 PyMuPDFLoader 来读取知识库的 PDF 文件。PyMuPDFLoader 是 PDF 解析器中速度最快的一种,结果会包含 PDF 及其页面的详细元数据,并且每页返回一个文档。
from langchain.document_loaders.pdf import PyMuPDFLoader
# 创建一个 PyMuPDFLoader Class 实例,输入为待加载的 pdf 文档路径
loader = PyMuPDFLoader("../../data_base/knowledge_db/pumkin_book/pumpkin_book.pdf")
# 调用 PyMuPDFLoader Class 的函数 load 对 pdf 文件进行加载
pdf_pages = loader.load()
文档加载后储存在 pages
变量中:
page
的变量类型为List
- 打印
pages
的长度可以看到 pdf 一共包含多少页
print(f"载入后的变量类型为:{type(pdf_pages)},", f"该 PDF 一共包含 {len(pdf_pages)} 页")
page
中的每一元素为一个文档,变量类型为 langchain_core.documents.base.Document
, 文档变量类型包含两个属性
page_content
包含该文档的内容。meta_data
为文档相关的描述性数据。
pdf_page = pdf_pages[1]
print(f"每一个元素的类型:{type(pdf_page)}.",
f"该文档的描述性数据:{pdf_page.metadata}",
f"查看该文档的内容:\n{pdf_page.page_content}",
sep="\n------\n")
MD 文档
from langchain.document_loaders.markdown import UnstructuredMarkdownLoader
loader = UnstructuredMarkdownLoader("../../data_base/knowledge_db/prompt_engineering/1. 简介 Introduction.md")
md_pages = loader.load()
print(f"载入后的变量类型为:{type(md_pages)},", f"该 Markdown 一共包含 {len(md_pages)} 页")
md_page = md_pages[0]
print(f"每一个元素的类型:{type(md_page)}.",
f"该文档的描述性数据:{md_page.metadata}",
f"查看该文档的内容:\n{md_page.page_content[0:][:200]}",
sep="\n------\n")
数据清洗
我们期望知识库的数据尽量是有序的、优质的、精简的,因此我们要删除低质量的、甚至影响理解的文本数据。
可以看到上文中读取的pdf文件不仅将一句话按照原文的分行添加了换行符\n
,也在原本两个符号中间插入了\n
,我们可以使用正则表达式匹配并删除掉\n
import re
pattern = re.compile(r'[^\u4e00-\u9fff](\n)[^\u4e00-\u9fff]', re.DOTALL)
pdf_page.page_content = re.sub(pattern, lambda match: match.group(0).replace('\n', ''), pdf_page.page_content)
print(pdf_page.page_content)
进一步分析数据,我们发现数据中还有不少的•
和空格,我们的简单实用replace方法即可。
pdf_page.page_content = pdf_page.page_content.replace('•', '')
pdf_page.page_content = pdf_page.page_content.replace(' ', '')
print(pdf_page.page_content)
md文件每一段中间隔了一个换行符,我们同样可以使用replace方法去除。
md_page.page_content = md_page.page_content.replace('\n\n', '\n')
print(md_page.page_content)
文档分割
由于单个文档的长度往往会超过模型支持的上下文,导致检索得到的知识太长超出模型的处理能力,因此,在构建向量知识库的过程中,我们往往需要对文档进行分割,将单个文档按长度或者按固定的规则分割成若干个 chunk,然后将每个 chunk 转化为词向量,存储到向量数据库中。
在检索时,我们会以 chunk 作为检索的元单位,也就是每一次检索到 k 个 chunk 作为模型可以参考来回答用户问题的知识,这个 k 是我们可以自由设定的。
Langchain 中文本分割器都根据 chunk_size
(块大小)和 chunk_overlap
(块与块之间的重叠大小)进行分割。
- chunk_size 指每个块包含的字符或 Token (如单词、句子等)的数量
- chunk_overlap 指两个块之间共享的字符数量,用于保持上下文的连贯性,避免分割丢失上下文信息
Langchain 提供多种文档分割方式,区别在怎么确定块与块之间的边界、块由哪些字符/token组成、以及如何测量块大小
- RecursiveCharacterTextSplitter(): 按字符串分割文本,递归地尝试按不同的分隔符进行分割文本。
- CharacterTextSplitter(): 按字符来分割文本。
- MarkdownHeaderTextSplitter(): 基于指定的标题来分割markdown 文件。
- TokenTextSplitter(): 按token来分割文本。
- SentenceTransformersTokenTextSplitter(): 按token来分割文本
- Language(): 用于 CPP、Python、Ruby、Markdown 等。
- NLTKTextSplitter(): 使用 NLTK(自然语言工具包)按句子分割文本。
- SpacyTextSplitter(): 使用 Spacy按句子的切割文本。
''' * RecursiveCharacterTextSplitter 递归字符文本分割
RecursiveCharacterTextSplitter 将按不同的字符递归地分割(按照这个优先级["\n\n", "\n", " ", ""]),
这样就能尽量把所有和语义相关的内容尽可能长时间地保留在同一位置
RecursiveCharacterTextSplitter需要关注的是4个参数:
* separators - 分隔符字符串数组
* chunk_size - 每个文档的字符数量限制
* chunk_overlap - 两份文档重叠区域的长度
* length_function - 长度计算函数 '''
#导入文本分割器
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 知识库中单段文本长度
CHUNK_SIZE = 500
# 知识库中相邻文本重合长度
OVERLAP_SIZE = 50
# 使用递归字符文本分割器
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=OVERLAP_SIZE
)
text_splitter.split_text(pdf_page.page_content[0:1000])
split_docs = text_splitter.split_documents(pdf_pages)
print(f"切分后的文件数量:{len(split_docs)}")
print(f"切分后的字符数(可以用来大致评估 token 数):{sum([len(doc.page_content) for doc in split_docs])}")
搭建并使用向量数据库
前序配置
import os
from dotenv import load_dotenv, find_dotenv
# 读取本地/项目的环境变量。
# find_dotenv()寻找并定位.env文件的路径
# load_dotenv()读取该.env文件,并将其中的环境变量加载到当前的运行环境中
# 如果你设置的是全局的环境变量,这行代码则没有任何作用。
_ = load_dotenv(find_dotenv())
# 如果你需要通过代理端口访问,你需要如下配置
os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:7890'
os.environ["HTTP_PROXY"] = 'http://127.0.0.1:7890'
# 获取folder_path下所有文件路径,储存在file_paths里
file_paths = []
folder_path = '../../data_base/knowledge_db'
for root, dirs, files in os.walk(folder_path):
for file in files:
file_path = os.path.join(root, file)
file_paths.append(file_path)
print(file_paths[:3])
from langchain.document_loaders.pdf import PyMuPDFLoader
from langchain.document_loaders.markdown import UnstructuredMarkdownLoader
# 遍历文件路径并把实例化的loader存放在loaders里
loaders = []
for file_path in file_paths:
file_type = file_path.split('.')[-1]
if file_type == 'pdf':
loaders.append(PyMuPDFLoader(file_path))
elif file_type == 'md':
loaders.append(UnstructuredMarkdownLoader(file_path))
from langchain.document_loaders.pdf import PyMuPDFLoader
from langchain.document_loaders.markdown import UnstructuredMarkdownLoader
# 遍历文件路径并把实例化的loader存放在loaders里
loaders = []
for file_path in file_paths:
file_type = file_path.split('.')[-1]
if file_type == 'pdf':
loaders.append(PyMuPDFLoader(file_path))
elif file_type == 'md':
loaders.append(UnstructuredMarkdownLoader(file_path))
# 下载文件并存储到text
texts = []
for loader in loaders: texts.extend(loader.load())
载入后的变量类型为langchain_core.documents.base.Document
, 文档变量类型同样包含两个属性
page_content
包含该文档的内容。meta_data
为文档相关的描述性数据。
text = texts[1]
print(f"每一个元素的类型:{type(text)}.",
f"该文档的描述性数据:{text.metadata}",
f"查看该文档的内容:\n{text.page_content[0:]}",
sep="\n------\n")
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 切分文档
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, chunk_overlap=50)
split_docs = text_splitter.split_documents(texts)
构建Chroma向量库
Langchain 集成了超过 30 个不同的向量存储库。我们选择 Chroma 是因为它轻量级且数据存储在内存中,这使得它非常容易启动和开始使用。
LangChain 可以直接使用 OpenAI 和百度千帆的 Embedding
# 使用 OpenAI Embedding
# from langchain.embeddings.openai import OpenAIEmbeddings
# 使用百度千帆 Embedding
# from langchain.embeddings.baidu_qianfan_endpoint import QianfanEmbeddingsEndpoint
# 使用我们自己封装的智谱 Embedding,需要将封装代码下载到本地使用
from zhipuai_embedding import ZhipuAIEmbeddings
# 定义 Embeddings
# embedding = OpenAIEmbeddings()
embedding = ZhipuAIEmbeddings()
# embedding = QianfanEmbeddingsEndpoint()
# 定义持久化路径
persist_directory = '../../data_base/vector_db/chroma'
!rm -rf '../../data_base/vector_db/chroma' # 删除旧的数据库文件(如果文件夹中有文件的话),windows电脑请手动删除
from langchain.vectorstores.chroma import Chroma
vectordb = Chroma.from_documents(
documents=split_docs[:20], # 为了速度,只选择前 20 个切分的 doc 进行生成;使用千帆时因QPS限制,建议选择前 5 个doc
embedding=embedding,
persist_directory=persist_directory # 允许我们将persist_directory目录保存到磁盘上
)
vectordb.persist()
向量检索
相似度检索
Chroma的相似度搜索使用的是余弦距离
当你需要数据库返回严谨的按余弦相似度排序的结果时可以使用similarity_search
函数。
question="什么是大语言模型"
sim_docs = vectordb.similarity_search(question,k=3)
print(f"检索到的内容数:{len(sim_docs)}")
for i, sim_doc in enumerate(sim_docs):
print(f"检索到的第{i}个内容: \n{sim_doc.page_content[:200]}", end="\n--------------\n")
MMR检索
只考虑检索出内容的相关性会导致内容过于单一,可能丢失重要信息。
最大边际相关性 (MMR, Maximum marginal relevance
) 可以帮助我们在保持相关性的同时,增加内容的丰富度。
核心思想是在已经选择了一个相关性高的文档之后,再选择一个与已选文档相关性较低但是信息丰富的文档。这样可以在保持相关性的同时,增加内容的多样性,避免过于单一的结果。
mmr_docs = vectordb.max_marginal_relevance_search(question,k=3)
for i, sim_doc in enumerate(mmr_docs):
print(f"MMR 检索到的第{i}个内容: \n{sim_doc.page_content[:200]}", end="\n--------------\n")