精華|使用教程|編輯:龔雪|2017-03-24 10:05:38.000|閱讀 240 次
概述: Aggregator是MaxCompute-GRAPH作業(yè)中常用的feature之一,特別是解決機(jī)器學(xué)習(xí)問(wèn)題時(shí)。MaxCompute-GRAPH中Aggregator用于匯總并處理全局信息。本文將詳細(xì)介紹的Aggregator的執(zhí)行機(jī)制、相關(guān)API,并以Kmeans Clustering為例子說(shuō)明Aggregator的具體用法。
# 界面/圖表報(bào)表/文檔/IDE等千款熱門軟控件火熱銷售中 >>
如圖1所示,Aggregator的邏輯分兩部分,一部分在所有Worker上執(zhí)行,即分布式執(zhí)行,另一部分只在AggregatorOwner所在Worker上執(zhí)行,即單點(diǎn)。其中在所有Worker上執(zhí)行的操作包括創(chuàng)建初始值及局部聚合,然后將局部聚合結(jié)果發(fā)送給AggregatorOwner所在Worker上。AggregatorOwner所在Worker上聚合普通Worker發(fā)送過(guò)來(lái)的局部聚合對(duì)象,得到全局聚合結(jié)果,然后判斷迭代是否結(jié)束。全局聚合的結(jié)果會(huì)在下一輪超步分發(fā)給所有Worker,供下一輪迭代使用。
圖1 Aggregator機(jī)制
Aggregator共提供了五個(gè)API供用戶實(shí)現(xiàn)。下面逐個(gè)介紹5個(gè)API的調(diào)用時(shí)機(jī)及常規(guī)用途。
該API在所有Worker上執(zhí)行一次,調(diào)用時(shí)機(jī)是所有超步開始之前,通常用以初始化AggregatorValue。在第0輪超步中,調(diào)用WorkerContext.getLastAggregatedValue() 或ComputeContext.getLastAggregatedValue()可以獲取該API初始化的AggregatorValue對(duì)象。
該API在所有Worker上每輪超步開始時(shí)調(diào)用一次,用以初始化本輪迭代所用的AggregatorValue。通常操作是通過(guò)WorkerContext.getLastAggregatedValue() 得到上一輪迭代的結(jié)果,然后執(zhí)行部分初始化操作。
該API同樣在所有Worker上執(zhí)行,與上述API不同的是,該API由用戶顯示調(diào)用ComputeContext#aggregate(item)來(lái)觸發(fā),而上述兩個(gè)API,則由框架自動(dòng)調(diào)用。該API用以執(zhí)行局部聚合操作,其中第一個(gè)參數(shù)value是本W(wǎng)orker在該輪超步已經(jīng)聚合的結(jié)果(初始值是createInitialValue返回的對(duì)象),第二個(gè)參數(shù)是用戶代碼調(diào)用ComputeContext#aggregate(item)傳入的參數(shù)。該API中通常用item來(lái)更新value實(shí)現(xiàn)聚合。所有aggregate執(zhí)行完后,得到的value就是該Worker的局部聚合結(jié)果,然后由框架發(fā)送給AggregatorOwner所在的Worker。
該API執(zhí)行于AggregatorOwner所在Worker,用以合并各Worker局部聚合的結(jié)果,達(dá)到全局聚合對(duì)象。與aggregate類似,value是已經(jīng)聚合的結(jié)果,而partial待聚合的對(duì)象,同樣用partial更新value。
假定有3個(gè)worker,分別是w0、w1、w2,其局部聚合結(jié)果是p0、p1、p2。假定發(fā)送到AggregatorOwner所在Worker的順序?yàn)閜1、p0、p2。那么merge執(zhí)行次序?yàn)椋紫葓?zhí)行merge(p1, p0),這樣p1和p0就聚合為p1',然后執(zhí)行merge(p1', p2),p1'和p2聚合為p1'',而p1''即為本輪超步全局聚合的結(jié)果。
從上述示例可以看出,當(dāng)只有一個(gè)worker時(shí),不需要執(zhí)行merge方法,也就是說(shuō)merge()不會(huì)被調(diào)用。
當(dāng)AggregatorOwner所在Worker執(zhí)行完merge()后,框架會(huì)調(diào)用terminate(context, value)執(zhí)行最后的處理。其中第二個(gè)參數(shù)value,即為merge()最后得到全局聚合,在該方法中可以對(duì)全局聚合繼續(xù)修改。執(zhí)行完terminate()后,框架會(huì)將全局聚合對(duì)象分發(fā)給所有Worker,供下一輪超步使用。
terminate()方法的一個(gè)特殊之處在于,如果返回true,則整個(gè)作業(yè)就結(jié)束迭代,否則繼續(xù)執(zhí)行。在機(jī)器學(xué)習(xí)場(chǎng)景中,通常判斷收斂后返回true以結(jié)束作業(yè)。
下面以典型的KmeansClustering作為示例,來(lái)看下Aggregator具體用法。附件有完整代碼,這里我們逐個(gè)部分解析代碼。
GraphLoader部分用以加載輸入表,并轉(zhuǎn)換為圖的點(diǎn)或邊。這里我們輸入表的每行數(shù)據(jù)為一個(gè)樣本,一個(gè)樣本構(gòu)造一個(gè)點(diǎn),并用Vertex的value來(lái)存放樣本。
我們首先定義一個(gè)Writable類KmeansValue作為Vertex的value類型。
public static class KmeansValue implements Writable { DenseVector sample; public KmeansValue() { } public KmeansValue(DenseVector v) { this.sample = v; } @Override public void write(DataOutput out) throws IOException { wirteForDenseVector(out, sample); } @Override public void readFields(DataInput in) throws IOException { sample = readFieldsForDenseVector(in); } }
KmeansValue中封裝一個(gè)DenseVector對(duì)象來(lái)存放一個(gè)樣本,這里DenseVector類型來(lái)自,而wirteForDenseVector()及readFieldsForDenseVector()用以實(shí)現(xiàn)序列化及反序列化,可參見(jiàn)附件中的完整代碼。
我們自定義的KmeansReader代碼如下:
public static class KmeansReader extends
GraphLoader<LongWritable, KmeansValue, NullWritable, NullWritable> {
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<LongWritable, KmeansValue, NullWritable, NullWritable> context)
throws IOException {
KmeansVertex v = new KmeansVertex();
v.setId(recordNum);
int n = record.size();
DenseVector dv = new DenseVector(n);
for (int i = 0; i < n; i++) {
dv.set(i, ((DoubleWritable)record.get(i)).get());
}
v.setValue(new KmeansValue(dv));
context.addVertexRequest(v);
}
}
KmeansReader中,每讀入一行數(shù)據(jù)(一個(gè)Record)創(chuàng)建一個(gè)點(diǎn),這里用recordNum作為點(diǎn)的ID,將record內(nèi)容轉(zhuǎn)換成DenseVector對(duì)象并封裝進(jìn)VertexValue中。
自定義的KmeansVertex代碼如下。邏輯非常簡(jiǎn)單,每輪迭代要做的事情就是將自己維護(hù)的樣本執(zhí)行局部聚合。具體邏輯參見(jiàn)下面Aggregator的實(shí)現(xiàn)。
public static class KmeansVertex extends
Vertex<LongWritable, KmeansValue, NullWritable, NullWritable> {
@Override
public void compute(
ComputeContext<LongWritable, KmeansValue, NullWritable, NullWritable> context,
Iterable<NullWritable> messages) throws IOException {
context.aggregate(getValue());
}
}
整個(gè)Kmeans的主要邏輯集中在Aggregator中。首先是自定義的KmeansAggrValue,用以維護(hù)要聚合及分發(fā)的內(nèi)容。
public static class KmeansAggrValue implements Writable {
DenseMatrix centroids;
DenseMatrix sums; // used to recalculate new centroids
DenseVector counts; // used to recalculate new centroids
@Override
public void write(DataOutput out) throws IOException {
wirteForDenseDenseMatrix(out, centroids);
wirteForDenseDenseMatrix(out, sums);
wirteForDenseVector(out, counts);
}
@Override
public void readFields(DataInput in) throws IOException {
centroids = readFieldsForDenseMatrix(in);
sums = readFieldsForDenseMatrix(in);
counts = readFieldsForDenseVector(in);
}
}
KmeansAggrValue中維護(hù)了三個(gè)對(duì)象,其中centroids是當(dāng)前的K個(gè)中心點(diǎn),假定樣本是m維的話,centroids就是一個(gè)K*m的矩陣。sums是和centroids大小一樣的矩陣,每個(gè)元素記錄了到特定中心點(diǎn)最近的樣本特定維之和,例如sums(i,j)是到第i個(gè)中心點(diǎn)最近的樣本的第j維度之和。
counts是個(gè)K維的向量,記錄到每個(gè)中心點(diǎn)距離最短的樣本個(gè)數(shù)。sums和counts一起用以計(jì)算新的中心點(diǎn),也是要聚合的主要內(nèi)容。
接下來(lái)是自定義的Aggregator實(shí)現(xiàn)類KmeansAggregator,我們按照上述API的順序逐個(gè)看其實(shí)現(xiàn)。
首先是createStartupValue().
public static class KmeansAggregator extends Aggregator<KmeansAggrValue> {
public KmeansAggrValue createStartupValue(WorkerContext context) throws IOException {
KmeansAggrValue av = new KmeansAggrValue();
byte[] centers = context.readCacheFile("centers");
String lines[] = new String(centers).split("\n");
int rows = lines.length;
int cols = lines[0].split(",").length; // assumption rows >= 1
av.centroids = new DenseMatrix(rows, cols);
av.sums = new DenseMatrix(rows, cols);
av.sums.zero();
av.counts = new DenseVector(rows);
av.counts.zero();
for (int i = 0; i < lines.length; i++) {
String[] ss = lines[i].split(",");
for (int j = 0; j < ss.length; j++) {
av.centroids.set(i, j, Double.valueOf(ss[j]));
}
}
return av;
}
我們?cè)谠摲椒ㄖ谐跏蓟粋€(gè)KmeansAggrValue對(duì)象,然后從資源文件centers中讀取初始中心點(diǎn),并賦值給centroids。而sums和counts初始化為0。
接來(lái)下是createInitialValue()的實(shí)現(xiàn):
@Override
public KmeansAggrValue createInitialValue(WorkerContext context)
throws IOException {
KmeansAggrValue av = (KmeansAggrValue)context.getLastAggregatedValue(0);
// reset for next iteration
av.sums.zero();
av.counts.zero();
return av;
}
該方法中,我們首先獲取上一輪迭代的KmeansAggrValue,然后將sums和counts清零,其實(shí)是只保留了上一輪迭代出的centroids。
用以執(zhí)行局部聚合的aggregate()實(shí)現(xiàn)如下:
@Override
public void aggregate(KmeansAggrValue value, Object item)
throws IOException {
DenseVector sample = ((KmeansValue)item).sample;
// find the nearest centroid
int min = findNearestCentroid(value.centroids, sample);
// update sum and count
for (int i = 0; i < sample.size(); i ++) {
value.sums.add(min, i, sample.get(i));
}
value.counts.add(min, 1.0d);
}
該方法中調(diào)用findNearestCentroid()(實(shí)現(xiàn)見(jiàn)附件)找到樣本item歐拉距離最近的中心點(diǎn)索引,然后將其各個(gè)維度加到sums上,最后counts計(jì)數(shù)加1。
以上三個(gè)方法執(zhí)行于所有worker上,實(shí)現(xiàn)局部聚合。接下來(lái)看下在AggregatorOwner所在Worker執(zhí)行的全局聚合相關(guān)操作。
首先是merge的實(shí)現(xiàn):
@Override
public void merge(KmeansAggrValue value, KmeansAggrValue partial)
throws IOException {
value.sums.add(partial.sums);
value.counts.add(partial.counts);
}
merge的實(shí)現(xiàn)邏輯很簡(jiǎn)單,就是把各個(gè)worker聚合出的sums和counts相加即可。
最后是terminate()的實(shí)現(xiàn):
@Override
public boolean terminate(WorkerContext context, KmeansAggrValue value)
throws IOException {
// Calculate the new means to be the centroids (original sums)
DenseMatrix newCentriods = calculateNewCentroids(value.sums, value.counts, value.centroids);
// print old centroids and new centroids for debugging
System.out.println("\nsuperstep: " + context.getSuperstep() +
"\nold centriod:\n" + value.centroids + " new centriod:\n" + newCentriods);
boolean converged = isConverged(newCentriods, value.centroids, 0.05d);
System.out.println("superstep: " + context.getSuperstep() + "/"
+ (context.getMaxIteration() - 1) + " converged: " + converged);
if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
// converged or reach max iteration, output centriods
for (int i = 0; i < newCentriods.numRows(); i++) {
Writable[] centriod = new Writable[newCentriods.numColumns()];
for (int j = 0; j < newCentriods.numColumns(); j++) {
centriod[j] = new DoubleWritable(newCentriods.get(i, j));
}
context.write(centriod);
}
// true means to terminate iteration
return true;
}
// update centriods
value.centroids.set(newCentriods);
// false means to continue iteration
return false;
}
teminate()中首先根據(jù)sums和counts調(diào)用calculateNewCentroids()求平均計(jì)算出新的中心點(diǎn)。然后調(diào)用isConverged()根據(jù)新老中心點(diǎn)歐拉距離判斷是否已經(jīng)收斂。如果收斂或迭代次數(shù)達(dá)到最大數(shù),則將新的中心點(diǎn)輸出并返回true,以結(jié)束迭代。否則更新中心點(diǎn)并返回false以繼續(xù)迭代。其中calculateNewCentroids()和isConverged()的實(shí)現(xiàn)見(jiàn)附件。
main方法用以構(gòu)造GraphJob,然后設(shè)置相應(yīng)配置,并提交作業(yè)。代碼如下:
public static void main(String[] args) throws IOException {
if (args.length < 2)
printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(KmeansReader.class);
job.setRuntimePartitioning(false);
job.setVertexClass(KmeansVertex.class);
job.setAggregatorClass(KmeansAggregator.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)
job.setMaxIteration(Integer.parseInt(args[2]));
long start = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - start) / 1000.0 + " seconds");
}
這里需要注意的是job.setRuntimePartitioning(false),設(shè)置為false后,各個(gè)worker加載的數(shù)據(jù)不再根據(jù)Partitioner重新分區(qū),即誰(shuí)加載的數(shù)據(jù)誰(shuí)維護(hù)。
本文介紹了MaxCompute-GRAPH中的Aggregator機(jī)制,API含義以及示例Kmeans Clustering。總的來(lái)說(shuō),Aggregator基本步驟是,
1)每個(gè)worker啟動(dòng)時(shí)執(zhí)行createStartupValue用以創(chuàng)建AggregatorValue;
2)每輪迭代開始前,每個(gè)worker執(zhí)行createInitialValue來(lái)初始化本輪的AggregatorValue;
3)一輪迭代中每個(gè)點(diǎn)通過(guò)context.aggregate()來(lái)執(zhí)行aggregate()實(shí)現(xiàn)worker內(nèi)的局部迭代;
4)每個(gè)Worker將局部迭代結(jié)果發(fā)送給AggregatorOwner所在的Worker;
5)AggregatorOwner所在worker執(zhí)行多次merge,實(shí)現(xiàn)全局聚合;
6)AggregatorOwner所在Worker執(zhí)行terminate用以對(duì)全局聚合結(jié)果做處理并決定是否結(jié)束迭代。
更多行業(yè)資訊,更新鮮的技術(shù)動(dòng)態(tài),盡在。
本站文章除注明轉(zhuǎn)載外,均為本站原創(chuàng)或翻譯。歡迎任何形式的轉(zhuǎn)載,但請(qǐng)務(wù)必注明出處、不得修改原文相關(guān)鏈接,如果存在內(nèi)容上的異議請(qǐng)郵件反饋至chenjj@fc6vip.cn