[repost ]ML Pipelines:Spark 1.2中一个用于MLlib的High-Level API

作者Databricks Blog 编译:仲浩

在每次版本更新中,除下新算法和性能升级,Databricks在MLlib的易用性上同样投入了大量精力。类似Spark Core,MLlib提供了3个编程语言的API:Python、Java和Scala。除此之外,MLlib同样提供了代码示例,以方便不同背景用户的学习和使用。在Spark 1.2中,通过与AMPLab(UC Berkeley)合作,一个 pipeline API被添加到MLlib,再次简化了MLlib的建立工作,并添加了针对ML pipelines的调优机制。

实际应用中,一个ML pipeline往往包括一系列的阶段,比如数据预处理、特征提取、模型拟合及可视化。举个例子,文本分类可能就会包含文本分割与清洗、特征提取,并通过交叉验证训练一个分类模型。虽然当下每个步骤都有许多库可以使用,但是将每个步骤连接起来却并不是件容易的事情,特别是在大规模场景下。当下,大部分的库都不支持分布式计算,或者他们并不支持一个原生的pipeline建立和优化。不幸的是,这个问题经常被学术界所忽视,而在工业界却又不得不重点讨论。

本篇博文将简述Databricks和AMPLab在ML pipeline(MLlib)所做的工作,其中有些设计由scikit-learn项目和一些前期MLI工作启发而来。

Dataset Abstraction

在新的pipeline设计时,数据集通常由Spark SQL的SchemaRDD以及ML pipeline的一系列数据集转换表现。每个转换都会摄入一个输入数据集,并输出一个已转换数据集,同时输出数据集将成为下一个步骤的输入数据集。之所以使用Spark SQL,主要考虑到以下几个因素:数据导入/输出、灵活的列类型和操作,以及执行计划优化。

数据的输入和输出是一个ML pipeline的起点和终点。MLlib当下已为数种类型提供了实用的输入和输出工具,其中包括用于分类和回归的LabeledPoint、用于协同过滤的Rating等。然而真实的数据集可能会包含多种类型,比如用户/物品 ID、时间戳,亦或是原始记录,而当下的工具并没有很好地支持所有这些类型。同时,它们还使用了从其他ML库中继承的无效率文本存储格式。

通常主流的ML pipeline都会包含特征转换阶段,你可以把特征转换看成在现有列上加上一个新列。举个例子,比如:文本分词将文档拆成大量词,而tf-idf则将这些词转换为一个特征向量。在这个过程中,标签会被加工用于模型拟合。同时,在实际过程中,更复杂的特征转换也经常会出现。因此,数据集需要支撑不同类型的列,包括密集/稀疏向量,以及为现有列建立新列的操作。

pipeline-0

在上面这个例子中,id、text以及words在转换中都会被转入。在模型拟合中,它们是不需要的,但是在预测和模型校验时它们又会被用到。如果预测数据集只包含predicted labels,那么它们不会提供太多的信息。如果我们希望检验预测结果,比如检验false positives,那么结合predicted labels 、原始输入文本及tokenized words则是非常有必要的。这样一来,如果底层执行引擎经过优化,并且只加载所需列将是很必要的。

幸运的是,Spark SQL已经提供了大多数所期望的功能,机构不需要再重新开始。Spark支持从Parque读取SchemaRDDs,并支持将SchemaRDDs写入对应的Parque。Parque是一个非常有效的列存储格式,可以在RDDs和SchemaRDDs之间自由转换,它同时还支持Hive和Avro这样的外部数据源。使用Spark SQL,建立(说声明可能更为准确)新列将非常便捷和友好。SchemaRDD实体化使用了lazy模式,Spark SQL可以基于列的需求来优化执行计划,可以较好的满足用户需求。SchemaRDD支持标准的数据类型,为了让其可以更好地支持ML,技术团队为其添加了对向量类型的支持(用户定义类型),同时支持密集和稀疏特征向量。

下面是一段Scala代码,它实现了ML数据集导入/输出,以及一些简单的功能。在Spark知识库“examples/”目录下,你发现一些更加复杂的数据集示例(使用Scala和Python)。在这里,我们推荐用户阅读Spark SQL’s user guide以查看更多SchemaRDD详情,以及它所支撑的操作。

val sqlContext = SQLContext(sc)
import sqlContext._ // implicit conversions

// Load a LIBSVM file into an RDD[LabeledPoint].
val labeledPointRDD: RDD[LabeledPoint] =
  MLUtils.loadLibSVMFile("/path/to/libsvm")

// Save it as a Parquet file with implicit conversion
// from RDD[LabeledPoint] to SchemaRDD.
labeledPointRDD.saveAsParquetFile("/path/to/parquet")

// Load the parquet file back into a SchemaRDD.
val dataset = parquetFile("/path/to/parquet")

// Collect the feature vectors and print them.
dataset.select('features).collect().foreach(println)

Pipeline

新的Pipeline API位于名为“spark.ml”的包下。Pipeline由多个步骤组成, 这些步骤一般可分为两个类型: Transformer和Estimator。Transformer会摄入一个数据集,并输出一个新的数据集。比如,分词组件就是一个Transformer,它会将一个文本数据集转换成一个tokenized words数据集。Estimator首先必须满足输入数据集,并根据输入数据集产生一个模型。举个例子,逻辑归回就是一个Estimator,它会在一个拥有标签和特征的数据集上进行训练,并返回一个逻辑回归模型。

Pipeline建立起来比较简单:简单的声明它的步骤,配置参数,并将在一个pipeline object中进行封装。下面的代码演示了一个简单文本分类pipeline,由1个分词组件、1个哈希Term Frequency特征抽取组件,以及1个逻辑回归。

val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

Pipeline的本身就是个Estimator,因此我们可以轻松的使用。

val model = pipeline.fit(trainingDataset)

拟合模型包括了分词组件、哈希TF特征抽取组件,以及拟合逻辑回归模型。下面的图表绘制了整个工作流,虚线部分只在pipeline fitting中发生。

pipeline-1

这个拟合Pipeline模型是个Transformer,可以被用于预测、模型验证和模型检验。

model.transform(testDataset)
  .select('text, 'label, 'prediction)
  .collect()
  .foreach(println)

在 ML算法上,有一个麻烦的事情就是它们有许多hyperparameters需要被调整。同时,这些hyperparameters与被MLlib优化的模型参数完全不同。当然,如果缺乏数据和算法上的专业知识,我们很难发现这些hyperparameters组合的最优组合。然而,即使有专业知识,随着pipeline和hyperparameters规模的增大,这个过程也将变得异常复杂。而在实践中,hyperparameters的调整却通常与最终结果戚戚相关。举个例子,在下面的pipeline中,我们有两个hyperparameters需要调优,我们分别赋予了3个不同的值。因此,最终可能会产生9个不同的组合,我们期望从中找到一个最优组合。

在这里,spark支持hyperparameter的交叉验证。交叉验证被作为一个元方法,通过用户指定参数组合让其适合底层Estimator。这里的Estimator可以是一个pipeline,它可以与 Evaluator组队并输出一个标量度量用于预测,比如精度。调优一个Pipeline是非常容易的:

// Build a parameter grid.
val paramGrid = new ParamGridBuilder()
  .addGrid(hashingTF.numFeatures, Array(10, 20, 40))
  .addGrid(lr.regParam, Array(0.01, 0.1, 1.0))
  .build()

// Set up cross-validation.
val cv = new CrossValidator()
  .setNumFolds(3)
  .setEstimator(pipeline)
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(new BinaryClassificationEvaluator)

// Fit a model with cross-validation.
val cvModel = cv.fit(trainingDataset)

当然,在一个ML pipeline中,用户可以嵌入自己的transformers或者estimators是非常重要的(建立在用户已经实现了pipeline接口的情况下)。这个API让MLlib外部代码的使用和共享变得容易,我们建议用户去阅读 spark.ml user guide以获得关于pipeline API的更多信息。

结语

本篇博文介绍了Spark 1.2中引入的ML pipeline API,以及这个API的运行原理——需要由 SPARK-3530、SPARK-3569、 SPARK-3572、SPARK-4192和SPARK-4209多个JIRAs完成。我们建议用户阅读JIRA页面上公布的设计文档以获得更多消息和设计选择。需要提及的是,Pipeline API的开发并没有全部完成。同时,在Pipeline API之外,还有一些相关的工作需要完成,比如:SPARK-5097,需要添加一个data frame APIs到SchemaRDD;SPARK-4586,需要一个ML pipeline Python API;SPARK-3702,用于学习算法和模型的类层次结构。

英文原文:ML Pipelines: A New High-Level API for MLlib

文章出处:http://www.tuicool.com/articles/eyemqmB

[repost ]双倍提升Apache Spark排序性能

original:http://dataunion.org/9206.html

作者:Sandy Ryza 译者:孙薇

区别常见的Embarrassingly Parallel系统,类似MapReduce和Apache Spark(Apache Hadoop的下一代数据处理引擎)这样的计算引擎主要区别在于对“all-to-all” 操作的支持上。和许多分布式引擎一样,MapReduce和Spark的操作通常针对的是被分片数据集的子分片,很多操作每次只处理单个数据节点,同时这些操作所涉及到的数据往往都只存在于这个数据片内。all-to-all操作必须将数据集看作一个整体,而每个输出结果都可以总结自不同分片上的记录。Spark的groupByKey、sortByKey,还有reduceByKey这些shuffle功能都属于这方面常见的操作。

在这些分布式计算引擎中,shuffle指的是在一个all-to-all操作中将数据再分割和聚合的操作。显而易见,在实践生产中,我们在Spark部署时所发现的大多性能、可扩展性及稳定性问题都是在shuffle过程中产生的。

Cloudera和英特尔的工程师们正通力合作以扩展Spark的shuffle,使得shuffle可以更加快速与稳定地处理大量的数据集。Spark在很多方面相较MapReduce有更多优势,同时又在稳定性与可扩展性上相差无几。在此,我们从久经考验的MapReduce shuffle部署中吸取经验,以提高排序数据输出的shuffle性能。

在本文中,我们将会逐层解析——介绍目前Spark shuffle的运作实现模式,提出修改建议,并对性能的提高方式进行分析。更多的工作进展可以于正在进行中的SPARK-2926发现。

Spark目前的运作实现模式

一个shuffle包含两组任务:1. 产生shuffle数据的阶段;2.使用shuffle数据的阶段。鉴于历史原因,写入数据的任务被称做“map task”,而读取数据的任务被称做“reduce tasks”,但是以上角色分配只局限于单个job的某个具体shuffle过程中。在一个shuffle中扮演reduce的task,在另一个shuffle中可能就是map了,因为它在前者里面执行的是读取操作,而在后者中执行的是数据写入任务,并在随后的阶段中被消费。

MapReduce和Spark的shuffle都使用到了“pull”模式。在每个map任务中,数据被写入本地磁盘,然后在reduce任务中会远程请求读取这些数据。由于shuffle使用的是all-to-all模式,任何map任务输出的记录组都可能用于任意reduce。一个job在map时的shuffle操作基于以下原则:所有用于同一个reduce操作的结果都会被写入到相邻的组别中,以便获取数据时更为简单。

Spark默认的shuffle实现(即hash-based shuffle)是map阶段为每个reduce任务单独打开一个文件,这种操作胜在简单,但实际中却有一些问题,比如说实现时Spark必须维持大量的内存消耗,或者造成大量的随机磁盘I/O。此外,如果M和R分别代表着一个shuffle操作中的map和reduce数量,则hash-based shuffle需要产生总共MR个数量的临时文件,Shuffle consolidation将这个数量减至CR个(这里的C代表的是同时能够运行的map任务数量),但即便是经过这样的修改之后,在运行的reducer数量过多时还是经常会出现“文件打开过多”的限制。

Hash-based shuffle中单个map任务

Sort-based shuffle中单个map任务

为了进一步提高shuffle的稳定性与性能,从1.1版本开始,Spark引入了“sort-based shuffle”实现,其功能与MapReduce使用的map方式十分类似。在部署时,每个任务的map输出结果都会被储存在内存里(直到可用内存耗尽),然后在reduce任务中进行排序,之后再spill到一个单独的文件。如果在单个任务中该操作发生了多次,那么这个任务的输出将被合并。

在reduced的过程中,一组线程负责抓取远程的map输出blocks。当数据进入后,它们会被反序列化,再转化成一个适用于执行all-to-all操作的数据结构。在类似groupByKey、reduceByKey,还有aggregateByKey之类的聚合操作中,其结果会变成一个ExternalAppendOnlyMap(本质上是一个内存溢出时会spill到硬盘的哈希map)。在类似sortByKey的排序操作中,输出结果会变成一个ExternalSorter(将结果分类后可能会spill到硬盘,并在对结果进行排序后返回一个迭代程序)。

完全Sort-based Shuffle

上文所描述的方式有两个弊端:

  • 每个Spark reduce的任务都需要同时打开大量的反序列化记录,从而导致内存的大量消耗,而大量的Java对象对JVM的垃圾收集(garbage collection)产生压力,会造成系统变慢和卡顿,同时由于这个版本较之序列化的版本内存消耗更为巨大,因而Spark必须更早更频繁的spill,造成硬盘I/O也更为频繁。此外,由于判断反序列化对象的内存占用情况时难以达到100%的准确率,因此保持大量的反序列化对象会加剧内存不足的可能性。
  • 在引导需要在分片内的排序操作时,我们需要进行两次排序:mapper时按分片排序,reducer时按Key排序。

我们修改了map时在分片内按Key对结果进行排序,这样在reduce时我们只要合并每个map任务排序后的吧blocks即可。我们可以按照序列化的模式将每个block存到内存中,然后在合并时逐一地将结果反序列化。这样任何时候,内存中反序列化记录的最大数量就是已经合并的blocks总量。

完全sort-based shuffle中的单个map任务

单个reduce任务可以接收来自数以千计map任务的blocks,为了使得这个多路归并更加高效,尤其是在数据超过可用内存的情况下,我们引入了分层合并( tiered merger)的概念。如果需要合并许多保存在磁盘上的blocks,这样做可以最小化磁盘寻道数量。分层合并同样适用于ExternalAppendOnlyMap以及ExternalSorter的内部合并步骤,但是暂时我们还没有进行修改。

高性能合并

每个任务中有一组线程是负责同步抓取shuffle数据的,每个任务对应的内存池有48MB,用来存放相应的数据。

我们引入了SortShuffleReader,先从内存池中获取到blocks,然后[key, value]的方式向用户代码中返回迭代器对象。

Spark有一个所有任务共享的shuffle内存区域,默认大小是完整executor heap的20%。当blocks进入时,SortShuffleReader会尝试从该主区域中调用shuffle所需的内存,直至内存塞满调用失败为止,然后我们需要将数据spill到硬盘上以释放内存。SortShuffleReader将所有(好吧,并非所有的,有时候只会spill一小部分)内存中的数据块写入一个单独的文件中并存入硬盘。随着blocks被存入硬盘,一个后台线程会对其进行监视,并在必要时将这些文件合并为更大一些的磁盘blogs。“final merge”会将所有最终硬盘与内存中的blocks全部合并起来。

如何确定是时候进行一个临时的“磁盘到磁盘”合并?

spark.shuffle.maxMergeFactor(默认为100)控制着一次可以合并的硬盘blocks数量的最大值,当硬盘blocks的数量超过限制时,后台线程会运行一次合并以降低这个数量(但是不会马上奏效,详情请查看代码)。在确定需要合并多少blocks时,线程首先会将需要执行合并的blocks数量设定为最小值,并将这个值作为合并数量的上限,以期尽可能减少blocks的合并次数。因此,如果spark.shuffle.maxMergeFactor是100,而磁盘blocks的最终数量为110,这样只需总共进行11个blocks的合并,就可将最终磁盘blocks的数量保持在恰好100。想要再合并哪怕一个blocks,都会需要再一次的额外合并,而可能导致不必要的磁盘I/O。

maxMergeWidth为4的分层合并。每个矩形代表一个segment,其中三个合并为一个,然后最终有四个segment被合并到一个迭代器中,以备下一次操作使用。

与sortByKey的性能对比

我们测试了使用SparkPerf进行sortbykey时,在相应的修改后,性能有何变化。在其中我们选择了两个不同大小的数据集,以比较我们的改动在内存足以支持所有shuffle数据时,和不足以支持的情况下对于性能的增益情况。

Spark的sortByKey变化导致两个job和三个stage。

  • Sample stage:进行数据取样以创建一个分区范围,分区大小相等。
  • Map阶段:写入为reduce阶段准备的shuffle bucket。
  • Reduce阶段:得到相关的shuffle结果,按特定的数据集分区进行合并/分类。

引入一个6节点集群的基准,每个executor包含24个core和36GB的内存,大数据集有200亿条记录,压缩后在HDFS上占409.8GB。小数据集有20亿条记录,压缩后在HDFS上占15.9GB。每条记录都包含一对10个字符串的键值对,在两个case中,我们在超过1000个分片中测试了排序,每个stage的运行时间表以及总共的job如下图显示:

大数据集(越低则越好)

小数据集(越低则越好)

取样阶段耗时相同,因为此阶段并不涉及shuffle过程;在map阶段,在我们的改进下,每个分片中按Key对数据进行排序,导致这个阶段的运行时间增加了(大数据集增加了37%,小数据集则是27%)。但是增加的时间在reduce阶段得到了更大的补偿,由于现在只需合并排序后的数据,Reduce阶段的两个数据集的耗时共减少了66%,从而使得大数据集加速27%,小数据集加速17%。

下面还有什么?

SPARK-2926是Spark shuffle的几个改进计划的成果之一,在这个版本中很多方面上shuffle可以更好地管理内存:

  • SPARK-4550 用内存缓冲中的map输出数据作为原始数据,取代Java对象。map输出数据的空间消耗更少,从而使得spill更少,在原始数据的对比上更快。
  • SPARK-4452 更详细地追踪不同shuffle数据结构的内存分配,同时将无需消耗的内存尽早返还。
  • SPARK-3461 追踪agroupBy后出现的特定Key值相应字符串或者节点,而不是一次将其全部loading入内存。

作者简介:Sandy Ryza是Cloudera公司的数据科学家、Hadoop提交者,同时也是Spark的贡献者之一。他还是Advanced Analytics with Spark一书的作者之一。

Saisai(Jerry)Shao是一名英特尔公司的软件工程师,同时也是Spark的贡献者之一。

原文链接:Improving Sort Performance in Apache Spark: It’s a Double

文章出处:http://www.dataguru.cn/article-6524-1.html

[repost ]为什么Spark发展不如Hadoop?

original:http://dataunion.org/3126.html

一说大数据,人们往往想到Hadoop。这固然不错,但随着大数据技术的深入应用,多种类型的数据应用不断被要求提出,一些Hadoop被关注的范畴开始被人们注意,相关技术也迅速获得专业技术范畴的应用。最近半年来的Spark之热就是典型例子。

Spark是一个基于RAM计算的开源码ComputerCluster运算系统,目的是更快速地进行数据分析。Spark早期的核心部分代码只有3万行。Spark提供了与HadoopMap/Reduce相似的分散式运算框架,但基于RAM和优化设计,因此在交换式数据分析和datamining的Workload中表现不错。

进入2014年以后,Spark开源码生态系统大幅增长,已成为大数据范畴最活跃的开源码项目之一。Spark之所以有如此多的关注,塬因主要是因为Spark具有的高性能、高灵活性、与Hadoop生态系统完美融合等叁方面的特点。

首先,Spark对分散的数据集进行抽样,创新地提出RDD(ResilientDistributedDataset)的概念,所有的统计分析任务被翻译成对RDD的基本操作组成的有向无环图(DAG)。RDD可以被驻留在RAM中,往后的任务可以直接读取RAM中的数据;同时分析DAG中任务之间的依赖性可以把相邻的任务合并,从而减少了大量不准确的结果输出,极大减少了HarddiskI/O,使复杂数据分析任务更高效。从这个推算,如果任务够复杂,Spark比Map/Reduce快一到两倍。

其次,Spark是一个灵活的运算框架,适合做批次处理、工作流、交互式分析、流量处理等不同类型的应用,因此Spark也可以成为一个用途广泛的运算引擎,并在未来取代Map/Reduce的地位。

最后,Spark可以与Hadoop生态系统的很多组件互相操作。Spark可以运行在新一代资源管理框架YARN上,它还可以读取已有并存放在Hadoop上的数据,这是个非常大的优势。

虽然Spark具有以上叁大优点,但从目前Spark的发展和应用现状来看,Spark本身也存在很多缺陷,主要包括以下几个方面:

–稳定性方面,由于代码质量问题,Spark长时间运行会经常出错,在架构方面,由于大量数据被缓存在RAM中,Java回收垃圾缓慢的情况严重,导致Spark性能不稳定,在复杂场景中SQL的性能甚至不如现有的Map/Reduce。

–不能处理大数据,单独机器处理数据过大,或者由于数据出现问题导致中间结果超过RAM的大小时,常常出现RAM空间不足或无法得出结果。然而,Map/Reduce运算框架可以处理大数据,在这方面,Spark不如Map/Reduce运算框架有效。

–不能支持复杂的SQL统计;目前Spark支持的SQL语法完整程度还不能应用在复杂数据分析中。在可管理性方面,SparkYARN的结合不完善,这就为使用过程中埋下隐忧,容易出现各种难题。

虽然Spark活跃在Cloudera、MapR、Hortonworks等众多知名大数据公司,但是如果Spark本身的缺陷得不到及时处理,将会严重影响Spark的普及和发展。

 

文章出处:HKITBLOG

[repost ]教你如何成为Spark大数据高手?

original:http://dataunion.org/2697.html

Spark目前被越来越多的企业使用,和Hadoop一样,Spark也是以作业的形式向集群提交任务,那么如何成为Spark大数据高手?下面就来个深度教程。

Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,从多迭代批量处理出发,兼收并蓄数据仓库、流处理和图计算等多种计算范式,是罕见的全能选手。Spark采用一个统一的技术堆栈解决了云计算大数据的如流处理、图技术、机器学习、NoSQL查询等方面的所有核心问题,具有完善的生态系统,这直接奠定了其一统云计算大数据领域的霸主地位。

伴随Spark技术的普及推广,对专业人才的需求日益增加。Spark专业人才在未来也是炙手可热,轻而易举可以拿到百万的薪酬。而要想成为Spark高手,也需要一招一式,从内功练起:通常来讲需要经历以下阶段:

第一阶段:熟练的掌握Scala语言

Spark框架是采用Scala语言编写的,精致而优雅。要想成为Spark高手,你就必须阅读Spark的源代码,就必须掌握Scala,;

虽然说现在的Spark可以采用多语言Java、Python等进行应用程序开发,但是最快速的和支持最好的开发API依然并将永远是Scala方式的API,所以你必须掌握Scala来编写复杂的和高性能的Spark分布式程序;

尤其要熟练掌握Scala的trait、apply、函数式编程、泛型、逆变与协变等;

第二阶段:精通Spark平台本身提供给开发者API

掌握Spark中面向RDD的开发模式,掌握各种transformation和action函数的使用;

掌握Spark中的宽依赖和窄依赖以及lineage机制;

掌握RDD的计算流程,例如Stage的划分、Spark应用程序提交给集群的基本过程和Worker节点基础的工作原理等

第三阶段:深入Spark内核

此阶段主要是通过Spark框架的源码研读来深入Spark内核部分:

通过源码掌握Spark的任务提交过程;

通过源码掌握Spark集群的任务调度;

尤其要精通DAGScheduler、TaskScheduler和Worker节点内部的工作的每一步的细节;

第四阶级:掌握基于Spark上的核心框架的使用

Spark作为云计算大数据时代的集大成者,在实时流处理、图技术、机器学习、NoSQL查询等方面具有显著的优势,我们使用Spark的时候大部分时间都是在使用其上的框架例如Shark、Spark Streaming等:

Spark Streaming是非常出色的实时流处理框架,要掌握其DStream、transformation和checkpoint等;

Spark的离线统计分析功能,Spark 1.0.0版本在Shark的基础上推出了Spark SQL,离线统计分析的功能的效率有显著的提升,需要重点掌握;

对于Spark的机器学习和GraphX等要掌握其原理和用法;

第五阶级:做商业级别的Spark项目

通过一个完整的具有代表性的Spark项目来贯穿Spark的方方面面,包括项目的架构设计、用到的技术的剖析、开发实现、运维等,完整掌握其中的每一个阶段和细节,这样就可以让您以后可以从容面对绝大多数Spark项目。

第六阶级:提供Spark解决方案

彻底掌握Spark框架源码的每一个细节;

根据不同的业务场景的需要提供Spark在不同场景的下的解决方案;

根据实际需要,在Spark框架基础上进行二次开发,打造自己的Spark框架;

前面所述的成为Spark高手的六个阶段中的第一和第二个阶段可以通过自学逐步完成,随后的三个阶段最好是由高手或者专家的指引下一步步完成,最后一个阶段,基本上就是到”无招胜有招”的时期,很多东西要用心领悟才能完成。

 

文章出处:网络大数据

[repost ]用 LDA 做主题模型:当 MLlib 邂逅 GraphX

original:http://dataunion.org/15196.html

主题模型可以从一系列文章中自动推测讨论的主题。这些主题可以被用作总结和整理文章,也可以在机器学习流程的后期阶段用于特征化和降维。

在Spark 1.3中,MLlib现在支持最成功的主题模型之一,隐含狄利克雷分布(LDA)。LDA也是基于GraphX上构建的第一个MLlib算法。在这篇博文中,我们概述LDA和及其用例,并且解释GraphX是实现它最自然的方式。

主题模型

抽象地说,主题模型旨在一系列文章中找到一种结构。学习到这种“结构”之后,一个主题模型能回答以下这样的问题:X文章讨论的是什么?X文章和Y文章有多相似?如果我对Z文章感兴趣,我应该先读哪些文章?

LDA

主题模型是一个比较广的领域。Spark 1.3加入了隐含狄利克雷分布(LDA),差不多是现今最成功的主题模型。最初被开发用于文本分析和群体遗传学,LDA之后被不断拓展,应用到从时间序列分析到图片分析等问题。首先,我们从文本分析的角度描述LDA。

什么是主题?主题不是LDA的输入,所以LDA必须要从纯文本中推断主题。LDA将主题定义为词的分布。例如,当我们在一个20个新闻组的文章数据集上运行MLlib的LDA,开始的几个主题是:

看下三个主题中的高权重词语,我们可以很快了解每个主题在说什么:运动,空间探索和电脑。LDA的成功很大程度上源自它产生可解释主题的能力。

用例

除了推断出这些主题,LDA还可以推断每篇文章在主题上的分布。例如,X文章大概有60%在讨论“空间探索”,30%关于“电脑”,10%关于其他主题。

这些主题分布可以有多种用途:

  • 聚类: 主题是聚类中心,文章和多个类簇(主题)关联。聚类对整理和总结文章集合很有帮助。
    • 参看Blei教授和Lafferty教授对于Science杂志的文章生成的总结。点击一个主题,看到该主题下一系列文章。
  • 特征生成:LDA可以生成特征供其他机器学习算法使用。如前所述,LDA为每一篇文章推断一个主题分布;K个主题即是K个数值特征。这些特征可以被用在像逻辑回归或者决策树这样的算法中用于预测任务。
  • 降维:每篇文章在主题上的分布提供了一个文章的简洁总结。在这个降维了的特征空间中进行文章比较,比在原始的词汇的特征空间中更有意义。

在MLlib中使用LDA

我们给出一个使用LDA的小例子。我们在这儿描述这个过程,实际的代码在这个Github gist上。本例首先读取并预处理文章。预处理最重要的部分是选择词典。在本例中,我们将文本拆成词,之后去除(a)非字母词 (b)4个字符一下的短词 (c)最常见的20个词(停用词)。一般来说,在你自己的数据集上调整这个预处理步骤很重要。

我们运行LDA,使用10个主题和10轮迭代。根据你的数据集选择主题的数量很重要。其他参数设成默认,我们在Spark文档的Markdown文件(spark/docs/*.md)上训练LDA。

我们得到10个主题。下面是5个人工挑选出来的主题,每个主题配以最重要的5个词语。请注意每个主题有多么清晰地对应到Spark的一个组件!(打引号的主题标题是为了更清晰手动加的)

在Spark 1.3中LDA有Scala和Java的API。Python的API很快会加入。

实现:GraphX

有许多算法可以训练一个LDA模型。我们选择EM算法,因为它简单并且快速收敛。因为用EM训练LDA有一个潜在的图结构,在GraphX之上构建LDA是一个很自然的选择。

LDA主要有两类数据:词和文档。我们把这些数据存成一个偶图(如下所示),左边是词节点,右边是文档节点。每个词节点存储一些权重值,表示这个词语和哪个主题相关;类似的,每篇文章节点存储当前文章讨论主题的估计。

每当一个词出现在一篇文章中,图中就有一个边连接对应的词节点和文章节点。例如,在上图中,文章1包含词语“hockey” 和“system”

这些边也展示了这个算法的流通性。每轮迭代中,每个节点通过收集邻居数据来更新主题权重数据。下图中,文章2通过从连接的词节点收集数据来更新它的主题估计。

GraphX因此是LDA自然的选择。随着MLlib的成长,我们期望未来可以有更多图结构的学习算法!

可扩展性

LDA的并行化并不直观,已经有许多研究论文提出不同的策略来实现。关键问题是所有的方法都需要很大量的通讯。这在上图中很明显:词和文档需要在每轮迭代中用新数据更新相邻节点,而相邻节点太多了。

我们选择了EM算法的部分原因就是它通过很少轮的迭代就能收敛。更少的迭代,更少的通讯。

在Spark中加入LDA之前,我们在一个很大的Wikipedia数据集上做了测试。下面是一些数字:

  • 训练集规模:460万文档
  • 词典规模:110万词汇
  • 训练集规模:11亿词(大约239词/文章)
  • 100个主题
  • 16个 worker节点的EC2集群
  • 计时结果:10轮迭代中平均176秒/迭代

接下来是?

Spark的贡献者正在开发更多LDA算法:在线变分贝叶斯(一个快速近似算法)和吉布斯采样(一个更慢但是有时更准确的算法)。我们也在增加帮助模块,例如用于自动数据准备的Tokenizers和更多预测方法。

想开始用LDA,今天下载Spark 1.3

查看例子,了解API的细节,查看MLlib文档

致谢

LDA的开发是许多Spark贡献者的合作的结果:

更多资源

通过这些综述学习更多关于主题模型和LDA的内容:

从这些研究论文中获得深入了解:

 

翻译:东狗
英文出处:data bricks

文章出处:http://www.multiprocess.net/2015/04/2338.html

注:转载文章均来自于公开网络,仅供学习使用,不会用于任何商业用途,如果侵犯到原作者的权益,请您与我们联系删除或者授权事宜,联系邮箱:contact@dataunion.org。转载数盟网站文章请注明原文章作者,否则产生的任何版权纠纷与数盟无关。