【Spark ML系列】FP-Growth PrefixSpan频繁挖掘算法功能用法示例源码论文详解点击知乎关注免费看
总览
挖掘频繁项、项集、子序列或其他子结构通常是分析大规模数据集的首要步骤,在数据挖掘领域已经成为一个活跃的研究课题。我们建议用户参考维基百科上关于关联规则学习的相关信息。
FP-Growth
FP-growth算法在《Han et al., Mining frequent patterns without candidate generation》一文中进行了描述,其中“FP”代表频繁模式。给定一个交易数据集,FP-growth的第一步是计算项的频率并确定频繁项。与Apriori类似的算法不同,FP-growth的第二步使用后缀树(FP-tree)结构来编码事务,而不需要显式生成候选集,这种生成通常非常耗时。完成第二步后,可以从FP-tree中提取频繁项集。在spark.mllib中,我们实现了FP-growth的并行版本PFP,详见《Li et al., PFP: Parallel FP-growth for query recommendation》。PFP将基于事务的后缀分布到不同的机器上构建FP-tree,因此比单机实现更具扩展性。请参考相关论文以了解更多细节。
FP-growth操作的对象是项集,即无序包含唯一项的集合。由于Spark没有集合类型,所以项集使用数组来表示。
参数
spark.ml的FP-growth实现具有以下(超)参数:
- minSupport:被识别为频繁项集所需的最小支持度。例如,如果一个项在5个交易中出现3次,则其支持度为3/5=0.6。
- minConfidence:用于生成关联规则的最小置信度。置信度指示关联规则成立的频率。例如,如果项集X在事务中出现4次,而X和Y同时出现2次,则规则X => Y的置信度为2/4 = 0.5。该参数不会影响挖掘频繁项集,但是会指定从频繁项集生成关联规则时的最小置信度。
- numPartitions:用于分布式处理的分区数。默认情况下,该参数未设置,将使用输入数据集的分区数。
功能
FPGrowthModel提供以下功能:
- freqItemsets:以DataFrame格式呈现的频繁项集,包含以下列:
- items:array,给定的项集。
- freq:long,根据配置的模型参数计算的项集出现次数。
- associationRules:以DataFrame格式呈现的置信度高于minConfidence的关联规则,包含以下列:
-
antecedent:array,关联规则的前提项集。
-
consequent:array,只包含一个元素的项集,表示关联规则的结论。
-
confidence:double,置信度的定义请参考上述minConfidence。
-
lift:double,衡量前提项集对结论的预测能力,计算公式为
s u p p o r t ( a n t e c e d e n t U c o n s e q u e n t ) / ( s u p p o r t ( a n t e c e d e n t ) x s u p p o r t ( c o n s e q u e n t ) ) support(antecedent U consequent) / (support(antecedent) x support(consequent)) support(antecedentUconsequent)/(support(antecedent)xsupport(consequent)) -
support:double,支持度的定义请参考上述minSupport。
-
transform方法用于每个itemsCol中的事务,将其与所有关联规则的前提项进行比较。如果记录包含特定关联规则的所有前提项,则该规则被视为适用,并将其结论添加到预测结果中。transform方法将从所有适用规则中汇总结论作为预测结果。prediction列的数据类型与itemsCol相同,并且不包含itemsCol中已存在的项。
示例
package org.example.spark
import org.apache.spark.sql.SparkSession
object FPGrowthTest extends App {
import org.apache.spark.ml.fpm.FPGrowth
val spark = SparkSession
.builder()
.appName(s"${this.getClass.getSimpleName}")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val dataset = spark.createDataset(Seq(
"1 2 5 a",
"1 2 3 5 a",
"1 2 a b")
).map(t => t.split(" ")).toDF("items")
val fpgrowth = new FPGrowth().setItemsCol("items").setMinSupport(0.5).setMinConfidence(0.6)
val model = fpgrowth.fit(dataset)
// Display frequent itemsets.
model.freqItemsets.show()
// Display generated association rules.
model.associationRules.show()
// transform examines the input items against all the association rules and summarize the
// consequents as prediction
model.transform(dataset).show()
}
//
//+------------+----+
//| items|freq|
//+------------+----+
//| [5]| 2|
//| [5, a]| 2|
//| [5, a, 2]| 2|
//|[5, a, 2, 1]| 2|
//| [5, a, 1]| 2|
//| [5, 2]| 2|
//| [5, 2, 1]| 2|
//| [5, 1]| 2|
//| [1]| 3|
//| [2]| 3|
//| [2, 1]| 3|
//| [a]| 2|
//| [a, 2]| 2|
//| [a, 2, 1]| 2|
//| [a, 1]| 2|
//+------------+----+
//
//+----------+----------+------------------+----+
//|antecedent|consequent| confidence|lift|
//+----------+----------+------------------+----+
//| [2, 1]| [5]|0.6666666666666666| 1.0|
//| [2, 1]| [a]|0.6666666666666666| 1.0|
//| [5, a]| [2]| 1.0| 1.0|
//| [5, a]| [1]| 1.0| 1.0|
//| [5, 1]| [a]| 1.0| 1.5|
//| [5, 1]| [2]| 1.0| 1.0|
//| [5, a, 2]| [1]| 1.0| 1.0|
//| [a, 1]| [5]| 1.0| 1.5|
//| [a, 1]| [2]| 1.0| 1.0|
//| [a]| [5]| 1.0| 1.5|
//| [a]| [2]| 1.0| 1.0|
//| [a]| [1]| 1.0| 1.0|
//| [5, a, 1]| [2]| 1.0| 1.0|
//| [a, 2, 1]| [5]| 1.0| 1.5|
//| [2]| [5]|0.6666666666666666| 1.0|
//| [2]| [1]| 1.0| 1.0|
//| [2]| [a]|0.6666666666666666| 1.0|
//| [5]| [a]| 1.0| 1.5|
//| [5]| [2]| 1.0| 1.0|
//| [5]| [1]| 1.0| 1.0|
//+----------+----------+------------------+----+
//only showing top 20 rows
//
//+---------------+----------+
//| items|prediction|
//+---------------+----------+
//| [1, 2, 5, a]| []|
//|[1, 2, 3, 5, a]| []|
//| [1, 2, b]| [5, a]|
//+---------------+----------+
//
论文1
《Mining Frequent Patterns without CandidateGeneration: A Frequent-Pattern Tree》
-
FP-tree的主要特点包括:
- 高度压缩的结构:通常情况下,FP-tree比原始数据库小得多,通过共享频繁项目的事务来达到压缩的目的。
- 避免候选集生成:采用模式增长的方法,避免了昂贵的候选集生成和测试过程。
- 分治策略:采用划分数据库和分治的策略,可以显著减少后续的条件模式基数和条件FP-tree的大小。
- 直接模式生成:针对单路径FP-tree,采用直接枚举子路径生成模式,避免了构建条件FP-tree。
- 最小频繁项目作为后缀:使用最小频繁项目作为后缀,提供了良好的选择性,减少了搜索空间。
- 主要操作为计数和调整前缀路径计数:避免了大多数Apriori-like算法中的候选集生成和模式匹配操作。
- 避免重复扫描数据库:只需要对FP-tree进行一次扫描,避免了重复扫描原始数据库。
-
高度可扩展:能够高效处理包含大量长模式的数据集,且是现有候选集生成算法的10倍左右。
综上所述,FP-tree通过压缩结构、避免候选集生成和分治策略等手段,实现了对频繁模式的高效挖掘。
-
FP-growth算法与Apriori算法的主要区别是什么?
-
候选项集生成方式:
- Apriori算法通过不断生成候选项集并测试其支持度,来发现频繁项集。
- FP-growth算法则通过构造FP树来压缩存储数据库中的频繁模式信息,并基于模式片段生长的方法来发现频繁项集,避免了候选项集的生成。
-
搜索方式:
- Apriori算法采用水平搜索的方式,通过组合不同层的频繁项集来搜索频繁项集。
- FP-growth算法采用分区和分治的搜索技术,将搜索任务划分为更小的子任务,显著减少了搜索空间。
-
数据结构:
- Apriori算法主要依赖于事务数据库。
- FP-growth算法则利用FP树这一新的数据结构来压缩存储频繁模式信息,避免了对原始数据库的重复扫描。
-
计算代价:
- Apriori算法需要计算大量的候选项集支持度。
- FP-growth算法主要进行计数累积和前缀路径计数调整,计算代价较小。
-
适用场景:
- Apriori算法适合发现短频繁模式。
- FP-growth算法更适合处理长频繁模式。
-
效率:
- FP-growth算法相比Apriori算法更高效,尤其在稠密数据集或长频繁模式的情况下。
综上所述,FP-growth算法通过模式生长避免了候选项集生成,利用FP树压缩数据,采用分区搜索缩小搜索空间,从而比
Apriori算法更高效地发现频繁项集。
-
候选项集生成方式:
-
FP-growth算法在处理大规模数据库时,主要通过以下几种方式来保证可扩展性:文章来源:https://www.toymoban.com/news/detail-825927.html
- 构建FP-tree:采用构建FP-tree数据结构,该结构通常比原始数据库小得多,从而避免对原始数据库的重复扫描,减少磁盘I/O开销。
- 避免候选集生成:采用模式增长方法,避免生成和测试大量候选集,而是直接在FP-tree上进行模式增长,降低了计算开销。
- 分治策略:采用分治策略,将整个挖掘任务分解为一系列较小的任务,每个任务只涉及一个频繁项,从而减少了搜索空间。
- 并行计算:利用FP-tree结构特点,可以并行地计算每个频繁项的局部FP-tree,进一步减少了计算开销。
- 动态构建FP-tree:不一次性构建完整的FP-tree,而是根据需要动态构建,避免了存储空间的开销。
- 条件模式基数和条件FP-tree:利用条件模式基数和条件FP-tree,避免重复计算公共的前缀路径,从而减少计算开销。
- 使用最小频繁项作为后缀:使用最小频繁项作为后缀,提供了良好的选择性,减少了搜索空间。
- 递归调用模式增长:采用递归调用模式增长,递归的深度通常比较浅,减少了递归的开销。
-
动态投影数据库:在无法存储FP-tree的情况下,采用动态投影数据库的方法,将数据库拆分为多个子库,从而减少计算和存储的开销。
综上所述,FP-growth算法通过以上措施,能够有效地处理大规模数据库,保证算法的可扩展性。
论文2
《Li et al., PFP: Parallel FP-growth for query recommendation》这是spark实现算法主要参考的论文文章来源地址https://www.toymoban.com/news/detail-825927.html
- 本文主要提出了一个大规模并行的FP-growth算法PFP,旨在解决大规模频繁项集挖掘问题。主要内容包括:
- 并行FP-growth算法PFP:将大规模挖掘任务划分为独立的并行任务,并使用MapReduce模型进行计算,以实现接近线性的加速比。实验证明PFP算法可以处理包含800多万网页和1000多万标签的大规模数据集。
- 应用于查询推荐:将PFP算法应用于从del.icio.us网站挖掘的网页和标签数据,以发现标签-标签关联和网页-网页关联,为查询推荐或相关搜索提供支持。
- 实验结果:实验结果表明PFP算法具有良好的可扩展性,在2500台计算机上只需24分钟即可完成挖掘,并且发现的相关模式可以用于查询推荐。
-
结论:PFP算法是一个高效的并行FP-growth算法,可以扩展到大规模数据集上进行频繁项集挖掘,为查询推荐等应用提供支持。
综上所述,本文的核心贡献在于提出了一个可扩展的并行FP-growth算法PFP,并通过实验证明了其在支持查询推荐方面的有效性。
- PFP算法相比之前的并行FP-Growth算法有以下优势:
- 降低通信开销:PFP算法采用MapReduce模型,将计算任务划分为独立的任务,降低了计算机之间的通信开销。
- 故障恢复能力:MapReduce模型提供了故障恢复功能,增强了PFP算法的可靠性。
- 提高可扩展性:PFP算法实现了接近线性的加速比,可以扩展到大规模数据集,而之前的算法受到通信开销的制约,难以实现线性加速比。
- 支持长尾数据挖掘:PFP算法支持挖掘长尾数据,不需要全局最小支持度,可以挖掘出不同支持度的模式,而之前的算法通常需要设置较高的支持度。
-
挖掘质量高:PFP算法的挖掘结果质量高,包含大量的长尾模式,适用于查询推荐等应用。
综上所述,PFP算法在可扩展性、可靠性、长尾数据挖掘等方面优于之前的并行FP-Growth算法,是一个高效的并行FP-Growth算法。
- PFP算法在查询推荐中的应用主要基于其在大规模数据集中挖掘标签-标签和网页-网页关联的能力。具体来说,PFP算法在查询推荐中的应用包括:
- 标签查询推荐:利用标签-标签关联,当用户查询一个标签时,可以推荐与之相关的标签,从而帮助用户扩展查询,例如查询“java”时推荐“javascript”。
- 网页查询推荐:利用网页-网页关联,当用户查询一个网页时,可以推荐与之相关的网页,帮助用户找到更多相关内容,例如查询“维基百科”时推荐“百度百科”。
-
标签网页关联推荐:利用标签-网页和网页-标签关联,当用户查询一个标签或网页时,可以推荐与之相关的标签或网页,实现跨语言、跨领域的查询推荐。
这些应用在搜索、信息检索、问答等系统中具有广泛的应用价值,可以帮助用户更高效地发现和获取信息。
源码
/**
* FPGrowth和FPGrowthModel的共同参数
*/
private[fpm] trait FPGrowthParams extends Params with HasPredictionCol {
/**
* 项列名。
* 默认值:"items"
* @group param
*/
@Since("2.2.0")
val itemsCol: Param[String] = new Param[String](this, "itemsCol", "项列名")
/** @group getParam */
@Since("2.2.0")
def getItemsCol: String = $(itemsCol)
/**
* 频繁模式的最小支持度。[0.0, 1.0]。任何出现次数超过(minSupport * 数据集大小)的模式都会在频繁项集中输出。
* 默认值:0.3
* @group param
*/
@Since("2.2.0")
val minSupport: DoubleParam = new DoubleParam(this, "minSupport",
"频繁模式的最小支持度",
ParamValidators.inRange(0.0, 1.0))
/** @group getParam */
@Since("2.2.0")
def getMinSupport: Double = $(minSupport)
/**
* 并行FP-growth使用的分区数(至少为1)。默认情况下,未设置该参数,将使用输入数据集的分区数。
* @group expertParam
*/
@Since("2.2.0")
val numPartitions: IntParam = new IntParam(this, "numPartitions",
"并行FP-growth使用的分区数", ParamValidators.gtEq[Int](1))
/** @group expertGetParam */
@Since("2.2.0")
def getNumPartitions: Int = $(numPartitions)
/**
* 生成关联规则的最小置信度。minConfidence不会影响频繁项集的挖掘,但会影响关联规则的生成。
* 默认值:0.8
* @group param
*/
@Since("2.2.0")
val minConfidence: DoubleParam = new DoubleParam(this, "minConfidence",
"生成关联规则的最小置信度",
ParamValidators.inRange(0.0, 1.0))
/** @group getParam */
@Since("2.2.0")
def getMinConfidence: Double = $(minConfidence)
setDefault(minSupport -> 0.3, itemsCol -> "items", minConfidence -> 0.8)
/**
* 验证和转换输入模式。
* @param schema 输入模式
* @return 输出模式
*/
@Since("2.2.0")
protected def validateAndTransformSchema(schema: StructType): StructType = {
val inputType = schema($(itemsCol)).dataType
require(inputType.isInstanceOf[ArrayType],
s"输入列必须是${ArrayType.simpleString}类型,但得到的是${inputType.catalogString}类型.")
SchemaUtils.appendColumn(schema, $(predictionCol), schema($(itemsCol)).dataType)
}
}
/**
* 并行FP-growth算法用于挖掘频繁项集。该算法的描述参见
* <a href="https://doi.org/10.1145/1454008.1454027">Li et al., PFP: Parallel FP-Growth for Query
* Recommendation</a>。PFP将计算分配到每个工作节点上,每个节点执行一个独立的挖掘任务组。
* FP-Growth算法的描述参见
* <a href="https://doi.org/10.1145/335191.335372">Han et al., Mining frequent patterns without
* candidate generation</a>。在fit()过程中,itemsCol列中的空值将被忽略。
*
* @see <a href="http://en.wikipedia.org/wiki/Association_rule_learning">
* Association rule learning (Wikipedia)</a>
*/
@Since("2.2.0")
class FPGrowth @Since("2.2.0") (
@Since("2.2.0") override val uid: String)
extends Estimator[FPGrowthModel] with FPGrowthParams with DefaultParamsWritable {
@Since("2.2.0")
def this() = this(Identifiable.randomUID("fpgrowth"))
/** @group setParam */
@Since("2.2.0")
def setMinSupport(value: Double): this.type = set(minSupport, value)
/** @group expertSetParam */
@Since("2.2.0")
def setNumPartitions(value: Int): this.type = set(numPartitions, value)
/** @group setParam */
@Since("2.2.0")
def setMinConfidence(value: Double): this.type = set(minConfidence, value)
/** @group setParam */
@Since("2.2.0")
def setItemsCol(value: String): this.type = set(itemsCol, value)
/** @group setParam */
@Since("2.2.0")
def setPredictionCol(value: String): this.type = set(predictionCol, value)
@Since("2.2.0")
override def fit(dataset: Dataset[_]): FPGrowthModel = {
transformSchema(dataset.schema, logging = true)
genericFit(dataset)
}
private def genericFit[T: ClassTag](dataset: Dataset[_]): FPGrowthModel = instrumented {
instr =>
val handlePersistence = dataset.storageLevel == StorageLevel.NONE
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, params: _*)
val data = dataset.select($(itemsCol))
val items = data.where(col($(itemsCol)).isNotNull).rdd.map(r => r.getSeq[Any](0).toArray)
val mllibFP = new MLlibFPGrowth().setMinSupport($(minSupport))
if (isSet(numPartitions)) {
mllibFP.setNumPartitions($(numPartitions))
}
if (handlePersistence) {
items.persist(StorageLevel.MEMORY_AND_DISK)
}
val inputRowCount = items.count()
instr.logNumExamples(inputRowCount)
val parentModel = mllibFP.run(items)
val rows = parentModel.freqItemsets.map(f => Row(f.items, f.freq))
val schema = StructType(Array(
StructField("items", dataset.schema($(itemsCol)).dataType, nullable = false),
StructField("freq", LongType, nullable = false)))
val frequentItems = dataset.sparkSession.createDataFrame(rows, schema)
if (handlePersistence) {
items.unpersist()
}
copyValues(new FPGrowthModel(uid, frequentItems, parentModel.itemSupport, inputRowCount))
.setParent(this)
}
@Since("2.2.0")
override def transformSchema(schema: StructType): StructType = {
validateAndTransformSchema(schema)
}
@Since("2.2.0")
override def copy(extra: ParamMap): FPGrowth = defaultCopy(extra)
}
/**
* 并行FP-growth算法用于挖掘频繁项集。该算法的描述参见
* <a href="https://doi.org/10.1145/1454008.1454027">Li et al., PFP: Parallel FP-Growth for Query
* Recommendation</a>。PFP将计算分配到每个工作节点上,每个节点执行一个独立的挖掘任务组。
* FP-Growth算法的描述参见
* <a href="https://doi.org/10.1145/335191.335372">Han et al., Mining frequent patterns without
* candidate generation</a>。在fit()过程中,itemsCol列中的空值将被忽略。
*
* @see <a href="http://en.wikipedia.org/wiki/Association_rule_learning">
* Association rule learning (Wikipedia)</a>
*/
@Since("2.2.0")
class FPGrowth @Since("2.2.0") (
@Since("2.2.0") override val uid: String)
extends Estimator[FPGrowthModel] with FPGrowthParams with DefaultParamsWritable {
@Since("2.2.0")
def this() = this(Identifiable.randomUID("fpgrowth"))
/** @group setParam */
@Since("2.2.0")
def setMinSupport(value: Double): this.type = set(minSupport, value)
/** @group expertSetParam */
@Since("2.2.0")
def setNumPartitions(value: Int): this.type = set(numPartitions, value)
/** @group setParam */
@Since("2.2.0")
def setMinConfidence(value: Double): this.type = set(minConfidence, value)
/** @group setParam */<
到了这里,关于【Spark ML系列】Frequent Pattern Mining频繁挖掘算法功能用法示例源码论文详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!