spark基于HNSW向量检索

这篇具有很好参考价值的文章主要介绍了spark基于HNSW向量检索。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

参考文档:https://talks.anghami.com/blazing-fast-approximate-nearest-neighbour-search-on-apache-spark-using-hnsw/
HNSW参数调优文档:https://github.com/nmslib/hnswlib/blob/master/ALGO_PARAMS.md

spark 运行HNSW向量检索分为以下三步
1 创建HNSW索引,并存储到磁盘
2 将存储的索引分发到每个executor
3 进行向量检索
使用HHSW构建索引,并使用spark进行分布式向量检索,1200万向量构建索引40分钟,向量检索10分钟完成(时间取决于m和ef的大小,本人m=30,ef=1000,不然总是报错m或者ef太小)如m=30,ef=1000 1200万构建索引20分钟,向量检索还是10分钟。

1 创建HNSW索引

输入为spark dataset格式数据,有id和features组成,features为Array[Float]形式向量


import com.stepstone.search.hnswlib.jna.{Index, SpaceName}
import org.apache.spark.SparkFiles
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
import java.nio.file.Paths
import scala.reflect.runtime.universe.TypeTag
class annUtilsHnsw {


  /**
   * Builds an hnsw index.
   *
   * Default HNSW parameters are found to be good enough.
   *
   * HNSW index requires integer based object ids, so the builder re-indexes the original objects keys into integer
   * keys.
   *
   * For information on HNSW parameter tuning, [[https://github.com/nmslib/hnswlib/blob/master/ALGO_PARAMS.md]]
   *
   * @param vectorSize features vector size
   * @param features objects features to build an index for
   * @param m a parameter for construction HNSW index
   * @param efConstruction a parameter for construction HNSW index
   * @tparam Key type of the object id in features objects
   * @return
   */
  def buildHnswIndex[Key : TypeTag : Encoder](spark:SparkSession,vectorSize: Int,
                                              features: Dataset[(Key, Array[Float])],
                                              m: Int = 100,
                                              efConstruction: Int = 200): HnswIndex[Key] = {

    // map objects keys to integer based index to be used in the HNSW index as it only accepts integer key
    import spark.implicits._
    val featuresReindexed = features
      .rdd.zipWithIndex().map(x=>{
      (x._1._1,x._1._2,x._2.toInt)
    })     .toDF("id", "features","index_id")
      .select("index_id", "id", "features")
      .cache()
    // collect feature vectors
    val featuresList = featuresReindexed
      .select($"index_id", $"features".cast("array<float>"))
      .as[(Int, Array[Float])]
      .collect()

    val objectIDsMap = featuresReindexed
      .select("index_id", "id")
      .as[(Int, Key)]
      .repartition(100)

    // build index
    val index = new Index(SpaceName.COSINE, vectorSize)
    index.initialize(featuresList.length, m, efConstruction, (System.currentTimeMillis() / 1000).toInt)
    //    index.initialize(indexLength, 16, 200, (System.currentTimeMillis() / 1000).toInt)
    println("featuresList length",featuresList.length)
    // add vectors in parallel using .par
    featuresList.par.foreach {
      case (id: Int, vector: Array[Float]) =>
        index.addItem(vector, id)
    }

    // return wrapped index
    new HnswIndex(vectorSize, index, objectIDsMap)
  }


}





2 索引存储及查找

存储索引,加载索引并分发到每个executor.然后进行ANN查找


import com.stepstone.search.hnswlib.jna.{Index, SpaceName}
import org.apache.spark.SparkFiles
import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
import java.nio.file.Paths
import scala.reflect.runtime.universe.TypeTag

class HnswIndex[DstKey : TypeTag : Encoder](vectorSize: Int,
                                            index: Index,
                                            objectIDsMap: Dataset[(Int, DstKey)]) {


  /**
   * Executres KNN query using an HNSW index.
   *
   * @param queryFeatures      features to generates recs for
   * @param minScoreThreshold  Minimum similarity/distance.
   * @param topK               number of top recommendations to generate per instance
   * @param ef                 HNSW search time parameter
   * @param queryNumPartitions number of partitions for query vectors
   * @return
   */
  def knnQuery[SrcKey: TypeTag : Encoder](spark: SparkSession, queryFeatures: Dataset[(SrcKey, Array[Float])],
                                          minScoreThreshold: Double,
                                          topK: Int,
                                          ef: Int,
                                          queryNumPartitions: Int = 200, indexSavePath: String, m: Int, efConstruction: Int): Dataset[(SrcKey, DstKey, Double)] = {


    import spark.implicits._
    // init tmp directory
    val indexLength = index.getLength

    val saveLocalPath = "index"
    val indexLocalLocation = Paths.get(saveLocalPath)
    val indexFileName = indexLocalLocation.getFileName.toString
    println("indexFileName", indexFileName)
    // saving index locally
    index.save(indexLocalLocation)

    println(index.getData(0).get().mkString(","))
    val saveAbsoluteLocalPath = saveLocalPath
    println("local path", indexLocalLocation.toAbsolutePath.toString)
    println("absolute path: ", saveAbsoluteLocalPath)
    // add file to spark context to be sent to running nodes
    spark.sparkContext.addFile(indexFileName, true)
    //    spark.sparkContext.addFile(indexSavePath,true)

    println("context path: ", SparkFiles.getRootDirectory + "/" + indexFileName)

    // The current interface to HNSW misses the functionality of setting the ef query time
    // parameter, but it's lower bounded by topK as per https://github.com/nmslib/hnswlib/blob/master/ALGO_PARAMS.md#search-parameters,
    // so set a large value of k as max(ef, topK) to get high recall, then cut off after getting the nearest neighbor.
    val k = math.max(topK, ef)

    // local scope vectorSize
    val vectorSizeLocal = vectorSize

    // execute querying
    queryFeatures
      .repartition(queryNumPartitions)
      .toDF("id", "features")
      .withColumn("features", $"features".cast("array<float>"))
      .as[(SrcKey, Array[Float])]
      .mapPartitions((it: Iterator[(SrcKey, Array[Float])]) => {
        // load index
        val index = new Index(SpaceName.COSINE, vectorSizeLocal)

        index.initialize(indexLength, m, efConstruction, (System.currentTimeMillis() / 1000).toInt)
        index.load(Paths.get(SparkFiles.getRootDirectory + "/" + indexFileName), indexLength)


        it.flatMap(x => {
          val idx = x._1
          val vector = x._2
          val queryTuple = index.knnQuery(vector, k)
          val result = queryTuple.getIds
            //            queryTuple.getLabels
            .zip(queryTuple.getCoefficients)
            .map(qt => (idx, qt._1, 1.0 - qt._2.toDouble))
            .filter(_._3 >= minScoreThreshold)
            .sortBy(_._3)
            .reverse
            .slice(0, topK)
          result
        })
      })

      .as[(Int, Int, Double)]
      .toDF("src_id", "index_id", "score")
      .join(objectIDsMap.toDF("index_id", "dst_id"), Seq("index_id"))
      .select("src_id", "dst_id", "score")
      .repartition(400)
      .as[(SrcKey, DstKey, Double)]

  }
}

3 word2vec向量检索实例

  • 训练word2vec模型
  • 将模型的向量取出,调用上面buildHnswIndex 构建索引
  • 分布式进行knnQuery 向量检索


import org.apache.spark.ml.feature.Word2VecModel
import org.apache.spark.ml.linalg.DenseVector

object exampleWord2Vec {
  def main(args: Array[String]): Unit = {
   val spark = SparkSession.builder().getOrCreate()
    
    val GraphInputModel =  "graph/model/word2vecmodel"
    val indexPath =  "graph/model/index"
    spark.udf.register("denseVec2Array",(vec:DenseVector ) => vec.toArray.map(_.toFloat))
    spark.udf.register("vectorSplit",(a:String)=>(a.split(',').map(_.toFloat)))
    import spark.implicits._
    val word2vec = Word2VecModel.load(GraphInputModel )
    println(word2vec .getVectors.schema)
    word2vec .getVectors.show(10)
    println(word2vec .getVectors.count())
    val itemEmbeddings = word2vec .getVectors.selectExpr("cast(word as Int) as word", "denseVec2Array(vector) features")
      .as[(Int,Array[Float])]
    itemEmbeddings.show()
    println(itemEmbeddings.schema)
    val vectorsize=itemEmbeddings.take(1)(0)._2.length

    val hnswIndex = new annUtilsHnsw().buildHnswIndex(spark, vectorsize, itemEmbeddings, 20)
    val queryDF=hnswIndex.knnQuery[Int](spark,itemEmbeddings.limit(20),0.3,20,200,160,indexPath,20,200)
    queryDF .write.mode("overwrite").save(savePathMl + "graph/muiscEmbedding")
  }

}

4 HNSW pom依赖文件

hnswlib-jna文章来源地址https://www.toymoban.com/news/detail-668325.html

        <dependency>
            <groupId>com.stepstone.search.hnswlib.jna</groupId>
            <artifactId>hnswlib-jna</artifactId>
            <version>1.4.2</version>
        </dependency>

到了这里,关于spark基于HNSW向量检索的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 基于spark法律服务大数据智能推荐

    背景 随着互联网和信息技术的快速发展,电子商务、网上服务与交易等网络业务越来越普及,这些操作会产生大量数据(或海量数据),用户想要从海量数据中快速准确地寻找到自己感兴趣的信息已经变得越来越困难,这也就造就了搜索引擎的诞生,应用比较广泛的如Googl

    2024年02月10日
    浏览(41)
  • 推荐算法:HNSW算法简介

    推荐算法:HNSW算法简介 1. HNSW算法概述 2. HNSW算法原理 1. Delaunay图 2. NSW算法 3. HNSW算法 3. HNSW算法实现 1. hnswlib 2. nmslib 3. faiss 4. 参考链接 文献链接:Efficient and robust approximate nearest neighbor search using Hierarchical Navigable Small World graphs HNSW(Hierarchical Navigable Small Word)算法算是目前推

    2024年02月14日
    浏览(29)
  • 基于Spark+django的国漫推荐系统--计算机毕业设计项目

    近年来,随着互联网的蓬勃发展,企事业单位对信息的管理提出了更高的要求。以传统的管理方式已无法满足现代人们的需求。为了迎合时代需求,优化管理效率,各种各样的管理系统应运而生,随着各行业的不断发展,基于Spark的国漫推荐系统的建设也逐渐进入了信息化的

    2024年02月11日
    浏览(50)
  • Spark Hive实现基于协同过滤的电影推荐(MovieLens数据集)

      这篇文章记录一下我之前做过的通过Spark与Hive实现的基于协调过滤的电影推荐。这篇文章只能提供算法、思路和过程记录,并没有完整的代码,仅尽量全面地记录过程细节方便参考。   数据集是从下面这个地址下载的,数据集主要内容是关于用户对电影的评分、评价等。免

    2024年02月10日
    浏览(49)
  • 百度自研高性能ANN检索引擎,开源了

    作者 | Puck项目组 导读 Puck是百度自研的开源ANN检索引擎。Puck开源项目包含两种百度自研的检索算法,以高召回、高准确、高吞吐为目标,适用于多种数据规模和场景。随着业务发展不断的优化和迭代,进行充分的技术开发和测试,确保了技术的可靠性和成熟度。该项目于2

    2024年02月09日
    浏览(44)
  • 《向量数据库指南》——用 Milvus Cloud和 NVIDIA Merlin 搭建高效推荐系统结果

    结果 以下展示基于 CPU 和 GPU 的 3 组性能测试结果。该测试使用了 Milvus 的 HNSW(仅 CPU)和IVF_PQ(CPU 和 GPU)索引类型。 对于给定的参数组合,将 50% 的商品向量作为查询向量,并从剩余的向量中查询出 top-100 个相似向量。我们发现,在测试的参数设置范围内,HNSW 和 IVF_PQ 的召

    2024年02月05日
    浏览(57)
  • Langchain使用介绍之 - 基于向量存储进行检索

    Text Embedding Models   如何将一段Document转换成向量存储到向量数据库中,首先需要了解Langchain提供了哪些将文本转换成向量的model,langchian提供了很多将自然语言转换成向量的的模型,如下图所示,除了下图列举的model,更多支持的model可参考官网信息。    这里将以Langchain提供

    2024年02月09日
    浏览(43)
  • 基于SimCSE和Faiss的文本向量检索实践

    目录 文本的向量表示 1、SimCSE 2、支持无监督训 3、训练注意事项 向量检索 1、精准查找flat 2、HNSWx 3、IVFx 4、PQx 5、LSH 对博客标题进行向量检索 数据向量化 构建索引 文本检索 测试检索 传统的文本检索一般是建立倒排索引,对搜索词的召回结果进行打分排序返回最终结果,但

    2024年02月16日
    浏览(42)
  • 基于Langchain的txt文本向量库搭建与检索

    这里的源码主要来自于Langchain-ChatGLM中的向量库部分,做了一些代码上的修改和封装,以适用于基于 问题 和 包含数据库表描述的txt文件 (文件名为库表名,文件内容为库表中的字段及描述)对数据库表进行快速检索。 splitter.py myfaiss.py embedder.py Config是用来传参的类,这里略

    2024年02月04日
    浏览(36)
  • 《向量数据库》——怎么安装向量检索库Faiss?

    装 Faiss   以下教程将展示如何在 Linux 系统上安装 Faiss:   1. 安装 Conda。   在安装 Faiss 之前,先在系统上安装 Conda。Conda 是一个开源软件包和环境管理系统,可在 Windows、macOS 和 Linux 操作系统上运行。根据以下步骤在 Linux 系统上安装 Conda。   2. 从官网下载 Miniconda 安装包(

    2024年02月13日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包