Using Vector Database Adapters
Using Elasticsearch
To use Elasticsearch, you need to install the elasticsearch
package.
import embed_anything
import os
from typing import Dict, List
from embed_anything import EmbedData
from embed_anything.vectordb import Adapter
from embed_anything import BertConfig, EmbedConfig
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ElasticsearchAdapter(Adapter):
def __init__(self, api_key: str, cloud_id: str, index_name: str = "anything"):
self.es = Elasticsearch(cloud_id=cloud_id, api_key=api_key)
self.index_name = index_name
def create_index(
self, dimension: int, metric: str, mappings={}, settings={}, **kwargs
):
if "index_name" in kwargs:
self.index_name = kwargs["index_name"]
self.es.indices.create(
index=self.index_name, mappings=mappings, settings=settings
)
def convert(self, embeddings: List[List[EmbedData]]) -> List[Dict]:
data = []
for embedding in embeddings:
data.append(
{
"text": embedding.text,
"embeddings": embedding.embedding,
"metadata": {
"file_name": embedding.metadata["file_name"],
"modified": embedding.metadata["modified"],
"created": embedding.metadata["created"],
},
}
)
return data
def delete_index(self, index_name: str):
self.es.indices.delete(index=index_name)
def gendata(self, data):
for doc in data:
yield doc
def upsert(self, data: List[Dict]):
data = self.convert(data)
bulk(client=self.es, index="anything", actions=self.gendata(data))
index_name = "anything"
elastic_api_key = os.environ.get("ELASTIC_API_KEY")
elastic_cloud_id = os.environ.get("ELASTIC_CLOUD_ID")
# Initialize the ElasticsearchAdapter Class
elasticsearch_adapter = ElasticsearchAdapter(
api_key=elastic_api_key,
cloud_id=elastic_cloud_id,
index_name=index_name,
)
# Prase PDF and insert documents into Elasticsearch.
bert_config = BertConfig(
model_id="sentence-transformers/all-MiniLM-L6-v2",
chunk_size=100,
buffer_size=200,
)
embed_config = EmbedConfig(bert=bert_config)
data = embed_anything.embed_file(
"/path/to/my-file.pdf",
embeder="Bert",
adapter=elasticsearch_adapter,
config=embed_config,
)
# Create an Index with explicit mappings.
mappings = {
"properties": {
"embeddings": {"type": "dense_vector", "dims": 384},
"text": {"type": "text"},
}
}
settings = {}
elasticsearch_adapter.create_index(
dimension=384,
metric="cosine",
mappings=mappings,
settings=settings,
)
# Delete an Index
elasticsearch_adapter.delete_index(index_name=index_name)
Using Weaviate
To use Weaviate, you need to install the weaviate-client
package.
import weaviate, os
import weaviate.classes as wvc
from tqdm.auto import tqdm
import embed_anything
from embed_anything import EmbedData
from embed_anything.vectordb import Adapter
import textwrap
## Weaviate Adapter
from typing import List
class WeaviateAdapter(Adapter):
def __init__(self, api_key, url):
super().__init__(api_key)
self.client = weaviate.connect_to_weaviate_cloud(
cluster_url=url, auth_credentials=wvc.init.Auth.api_key(api_key)
)
if self.client.is_ready():
print("Weaviate is ready")
def create_index(self, index_name: str):
self.index_name = index_name
self.collection = self.client.collections.create(
index_name, vectorizer_config=wvc.config.Configure.Vectorizer.none()
)
return self.collection
def convert(self, embeddings: List[EmbedData]):
data = []
for embedding in embeddings:
property = embedding.metadata
property["text"] = embedding.text
data.append(
wvc.data.DataObject(properties=property, vector=embedding.embedding)
)
return data
def upsert(self, data_):
data_ = self.convert(data_)
self.client.collections.get(self.index_name).data.insert_many(data_)
def delete_index(self, index_name: str):
self.client.collections.delete(index_name)
URL = "URL"
API_KEY = "API_KEY"
weaviate_adapter = WeaviateAdapter(API_KEY, URL)
# create index
index_name = "Test_index"
if index_name in weaviate_adapter.client.collections.list_all():
weaviate_adapter.delete_index(index_name)
weaviate_adapter.create_index("Test_index")
# model id and embed image directory
model = embed_anything.EmbeddingModel.from_pretrained_hf(
embed_anything.WhichModel.Bert,
model_id="sentence-transformers/all-MiniLM-L12-v2",
)
data = embed_anything.embed_directory(
"test_files", embeder=model, adapter=weaviate_adapter
)
query_vector = embed_anything.embed_query(["What is attention"], embeder=model)[
0
].embedding
response = weaviate_adapter.collection.query.near_vector(
near_vector=query_vector,
limit=2,
return_metadata=wvc.query.MetadataQuery(certainty=True),
)
for i in range(len(response.objects)):
print(response.objects[i].properties["text"])
for res in response.objects:
print(textwrap.fill(res.properties["text"], width=120), end="\n\n")
Using Pinecone
To use Pinecone, you need to install the pinecone
package.
import re
from typing import Dict, List
import uuid
import embed_anything
import os
from embed_anything.vectordb import Adapter
from pinecone import Pinecone, ServerlessSpec
from embed_anything import EmbedData, EmbeddingModel, WhichModel, TextEmbedConfig
class PineconeAdapter(Adapter):
"""
Adapter class for interacting with Pinecone, a vector database service.
"""
def __init__(self, api_key: str):
"""
Initializes a new instance of the PineconeAdapter class.
Args:
api_key (str): The API key for accessing the Pinecone service.
"""
super().__init__(api_key)
self.pc = Pinecone(api_key=self.api_key)
self.index_name = None
def create_index(
self,
dimension: int,
metric: str = "cosine",
index_name: str = "anything",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
):
"""
Creates a new index in Pinecone.
Args:
dimension (int): The dimensionality of the embeddings.
metric (str, optional): The distance metric to use for similarity search. Defaults to "cosine".
index_name (str, optional): The name of the index. Defaults to "anything".
spec (ServerlessSpec, optional): The serverless specification for the index. Defaults to AWS in us-east-1 region.
"""
self.index_name = index_name
self.pc.create_index(
name=index_name, dimension=dimension, metric=metric, spec=spec
)
def delete_index(self, index_name: str):
"""
Deletes an existing index from Pinecone.
Args:
index_name (str): The name of the index to delete.
"""
self.pc.delete_index(name=index_name)
def convert(self, embeddings: List[EmbedData]) -> List[Dict]:
"""
Converts a list of embeddings into the required format for upserting into Pinecone.
Args:
embeddings (List[EmbedData]): The list of embeddings to convert.
Returns:
List[Dict]: The converted data in the required format for upserting into Pinecone.
"""
data_emb = []
for embedding in embeddings:
data_emb.append(
{
"id": str(uuid.uuid4()),
"values": embedding.embedding,
"metadata": {
"text": embedding.text,
"file": re.split(
r"/|\\", embedding.metadata.get("file_name", "")
)[-1],
},
}
)
return data_emb
def upsert(self, data: List[Dict]):
"""
Upserts data into the specified index in Pinecone.
Args:
data (List[Dict]): The data to upsert into Pinecone.
Raises:
ValueError: If the index has not been created before upserting data.
"""
data = self.convert(data)
if not self.index_name:
raise ValueError("Index must be created before upserting data")
self.pc.Index(name=self.index_name).upsert(data)
# Initialize the PineconeEmbedder class
api_key = os.environ.get("PINECONE_API_KEY")
index_name = "anything"
pinecone_adapter = PineconeAdapter(api_key)
try:
pinecone_adapter.delete_index("anything")
except:
pass
# Initialize the PineconeEmbedder class
pinecone_adapter.create_index(dimension=512, metric="cosine")
clip_model = EmbeddingModel.from_pretrained_hf(
WhichModel.Clip, "openai/clip-vit-base-patch16", revision="main"
)
embed_config = TextEmbedConfig(chunk_size=512, batch_size=32, buffer_size=200)
data = embed_anything.embed_image_directory(
"test_files",
embeder=clip_model,
adapter=pinecone_adapter,
config=embed_config,
)
print(data)