Frequent Pattern Mining - RDD-based API

挖掘频繁项,频繁项集,子序列或者是其他子结构 通常是分析大型数据集的第一步,这已经是数据挖掘多年的积极研究课题。有关 更多信息,请参考维基百科的关联规则学习spark.mllib 提供 FP-growth 的并行实现,这是一种普遍的挖掘频繁项集的算法。

FP-growth

FP-growth 算法是由韩家炜在 Han et al., Mining frequent patterns without candidate generation 提出的,“FP”代表频繁模式的意思。该算法的基本思路如下,给予一个事务数据库,FP-Growth 算法的第一步是每一项出现的次数,通过最小支持度进行筛选确定频繁项。不同于另一种关联算法Apriori 算法,FP-growth 算法第二步是通过产生后缀树( FP-tree )来存储所有的事务数据库中的项,而不是像 Apriori 算法那样花费大量的内存去产生候选项集。然后,通过遍历 FP-Tree 可以挖掘出频繁项集。在 spark.mllib 中,我们实现了并行的 FP-growth 算法叫做 PFP,正如论文Li et al., PFP: Parallel FP-growth for query recommendation 中描述的,PFP 将基于相同后缀的事务分发到相同的机器上,因此相比的单台机器的实现,这样有更好的扩展性。我们推荐用户读 Li et al., PFP: Parallel FP-growth for query recommendation这篇论文去理解更多的信息。 spark mllib 中 FP-growth 算法的实现要使用到以下两个超参数: • minSupport (最小支持度):一个项集被认为是频繁项集的最小支持度。例如,如果一个项总共有5个事务的数据集中,出现了3次,那么它的支持度为3/5=0.6 • numPartitions (分区数):分区的个数,同于将事务分配到几个分区。 例子

文中描述了 FP-growth 算法, Han et al., 挖掘频繁模式,无候选人生成 ,其中 “FP” 代表频繁模式。给定交易数据集,FP-growth 的第一步是计算项目频率并识别频繁项。与同样目的设计的类似 Apriori-like 的算法不同, FP-growth 的第二步使用后缀树 (FP-tree) 结构来编码事务,而不会显式生成候选集,生成的代价通常很高。第二步之后,可以从 FP-tree 中提取频繁项集。在 spark.mllib 我们中,我们实现了一个名为 PFP 的 FP-growth 并行版本,在 Li et al., PFP: Parallel FP-growth for query recommendation 中描述。PFP 根据事务后缀分配生长 FP-tree 的工作,因此比单机实现更具可扩展性。我们将用户参考论文了解更多细节。

spark.mllib 的 FP-growth 实现需要以下 (hyper-) 参数:

示例:

FPGrowth 实现 FP-growth 算法。它采用事务的 RDD ,每个事务是 泛型类型的数组的 Array。 使用事务调用 FPGrowth.run 返回一个FPGrowthModel ,它存储具有其频率的频繁项集。 以下示例说明了如何从事务中挖掘频繁项集和关联规则(请参阅 关联规则 的详细信息)。

有关 API 的详细信息,请参阅 FPGrowth Scala 文档

import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD

val data = sc.textFile("data/mllib/sample_fpgrowth.txt")

val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))

val fpg = new FPGrowth()
  .setMinSupport(0.2)
  .setNumPartitions(10)
val model = fpg.run(transactions)

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}

val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
  println(
    rule.antecedent.mkString("[", ",", "]")
      + " => " + rule.consequent .mkString("[", ",", "]")
      + ", " + rule.confidence)
}
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/SimpleFPGrowth.scala" in the Spark repo.

FPGrowth 实现 FP-growth 算法。 它采用 JavaRDD 的事务,其中每个事务是泛型类型的项的Iterable 。 使用事务调用 FPGrowth.run 返回一个 FPGrowthModel,它存储具有其频率的频繁项集。 以下示例说明了如何从事务中挖掘频繁项集和关联规则(请参阅 关联规则 的详细信息)。

有关API的详细信息,请参阅 FPGrowth Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;

JavaRDD<String> data = sc.textFile("data/mllib/sample_fpgrowth.txt");

JavaRDD<List<String>> transactions = data.map(line -> Arrays.asList(line.split(" ")));

FPGrowth fpg = new FPGrowth()
  .setMinSupport(0.2)
  .setNumPartitions(10);
FPGrowthModel<String> model = fpg.run(transactions);

for (FPGrowth.FreqItemset<String> itemset: model.freqItemsets().toJavaRDD().collect()) {
  System.out.println("[" + itemset.javaItems() + "], " + itemset.freq());
}

double minConfidence = 0.8;
for (AssociationRules.Rule<String> rule
  : model.generateAssociationRules(minConfidence).toJavaRDD().collect()) {
  System.out.println(
    rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java" in the Spark repo.

FPGrowth 实现 FP-growth 算法。 它需要事务的 RDD ,其中每个事务是通用类型的项目的List。 使用事务调用 FPGrowth.train 返回一个 FPGrowthModel ,它存储具有其频率的频繁项集。

有关API的更多详细信息,请参阅FPGrowth Python 文档

from pyspark.mllib.fpm import FPGrowth

data = sc.textFile("data/mllib/sample_fpgrowth.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect()
for fi in result:
    print(fi)
Find full example code at "examples/src/main/python/mllib/fpgrowth_example.py" in the Spark repo.

关联规则

AssociationRules 实现了一个并行规则生成算法,用于构建具有单个项目作为结果的规则。

有关API的详细信息,请参阅 AssociationRules Scala 文档

import org.apache.spark.mllib.fpm.AssociationRules
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset

val freqItemsets = sc.parallelize(Seq(
  new FreqItemset(Array("a"), 15L),
  new FreqItemset(Array("b"), 35L),
  new FreqItemset(Array("a", "b"), 12L)
))

val ar = new AssociationRules()
  .setMinConfidence(0.8)
val results = ar.run(freqItemsets)

results.collect().foreach { rule =>
  println("[" + rule.antecedent.mkString(",")
    + "=>"
    + rule.consequent.mkString(",") + "]," + rule.confidence)
}
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/AssociationRulesExample.scala" in the Spark repo.

AssociationRules 实现了一个并行规则生成算法,用于构建具有单个项目作为结果的规则。 有关API的详细信息,请参阅 AssociationRules Java 文档

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset;

JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = sc.parallelize(Arrays.asList(
  new FreqItemset<>(new String[] {"a"}, 15L),
  new FreqItemset<>(new String[] {"b"}, 35L),
  new FreqItemset<>(new String[] {"a", "b"}, 12L)
));

AssociationRules arules = new AssociationRules()
  .setMinConfidence(0.8);
JavaRDD<AssociationRules.Rule<String>> results = arules.run(freqItemsets);

for (AssociationRules.Rule<String> rule : results.collect()) {
  System.out.println(
    rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java" in the Spark repo.

PrefixSpan

PrefixSpan 是一种序列模式挖掘算法, 具体描述见论文 Pei et al., Mining Sequential Patterns by Pattern-Growth: The PrefixSpan Approach。我们推荐读者去读相关的论文去更加深入的理解的序列模式挖掘问题。

spark.mllib 的 PrefixSpan 实现需要以下参数:

示例:

以下示例说明了在序列上运行的 PrefixSpan(使用与 Pei 等同的符号):

  <(12)3>
  <1(32)(12)>
  <(12)5>
  <6>

PrefixSpan 实现 PrefixSpan 算法。 调用 PrefixSpan.run 返回一个 PrefixSpanModel,用于存储具有其频率的频繁序列。

有关API的详细信息,请参阅 PrefixSpan Scala 文档PrefixSpanModel Scala 文档

import org.apache.spark.mllib.fpm.PrefixSpan

val sequences = sc.parallelize(Seq(
  Array(Array(1, 2), Array(3)),
  Array(Array(1), Array(3, 2), Array(1, 2)),
  Array(Array(1, 2), Array(5)),
  Array(Array(6))
), 2).cache()
val prefixSpan = new PrefixSpan()
  .setMinSupport(0.5)
  .setMaxPatternLength(5)
val model = prefixSpan.run(sequences)
model.freqSequences.collect().foreach { freqSequence =>
  println(
    freqSequence.sequence.map(_.mkString("[", ", ", "]")).mkString("[", ", ", "]") +
      ", " + freqSequence.freq)
}
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/PrefixSpanExample.scala" in the Spark repo.

PrefixSpan 实现 PrefixSpan 算法。 调用 PrefixSpan.run 返回一个 PrefixSpanModel ,用于存储具有其频率的频繁序列。

有关API的详细信息,请参阅 PrefixSpan Java 文档PrefixSpanModel Java 文档

import java.util.Arrays;
import java.util.List;

import org.apache.spark.mllib.fpm.PrefixSpan;
import org.apache.spark.mllib.fpm.PrefixSpanModel;

JavaRDD<List<List<Integer>>> sequences = sc.parallelize(Arrays.asList(
  Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3)),
  Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1, 2)),
  Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5)),
  Arrays.asList(Arrays.asList(6))
), 2);
PrefixSpan prefixSpan = new PrefixSpan()
  .setMinSupport(0.5)
  .setMaxPatternLength(5);
PrefixSpanModel<Integer> model = prefixSpan.run(sequences);
for (PrefixSpan.FreqSequence<Integer> freqSeq: model.freqSequences().toJavaRDD().collect()) {
  System.out.println(freqSeq.javaSequence() + ", " + freqSeq.freq());
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaPrefixSpanExample.java" in the Spark repo.