Flink DataSet API #4 – 迭代操作(Iterations)

這篇文章將介紹 Flink 中對 DataSet 進行迭代操作(iteration operations)的方法:批次迭代(Bulk iteration)以及差異性迭代(Delta iteration)。

迭代演算法(iterative algorithm)在資料分析的許多領域中都會需要用到,例如:機器學習(machine learning)或圖形分析(graph analysis)。要從大量數據中粹取出有意義的資訊,迭代演算法是不可或缺的。為了加速其於大資料集上的分析,使迭代演算法能以大量並行的方式執行並突顯其重要性。

在 Flink 中的迭代操作封裝了某一部份需重覆執行的程式碼,實現了類似迴圈的概念。其透過定義 step function 並將之嵌入在一個特殊的迭代操作中,來實現迭代演算法。每一迭代所產生的結果,將會傳遞給下一次迭代。在 Flink 中有兩種迭代操作:批次迭代及差異性迭代。此兩種操作都會在當前的迭代狀態上,反覆的觸發給定的 step function,直到某個終結條件滿足為止。下表列出了批次迭代及 差異性迭代兩者的概觀:

批次迭代(Bulk Iterations)

首先要介紹的 Bulk Iterations 涵蓋了如下圖的簡單形態迭代:在每一次迭代中,step function 對輸入資料集(前次迭代的結果或迭代初始資料集)進行處理,並產生下一個版本的 partial solution(如:map、reduce、join 等)。

Iterate Operator

  1. Iteration Input:從一個資料集或前次操作中所得到的輸入資料集。
  2. Step Function:在每次迭代中所要執行的函式,用以處理輸入資料集,為任意的資料流。取決於我們的需求,其中可能會包含 map、reduce、join 等操作。
  3. Next Partial Solution:在每次迭代中,Step Function 所產生的最終結果,會作為下次迭代的 Iteration Input。
  4. Iteration Result:最後一次迭代所產生的結果,可以將之寫入 Data Sinks 中或用來作為後續程式中某些操作的輸入。

欲指定一個迭代的終止條件有以下方式:

  • 最大重覆次數(Maximum number of iterations):迭代執行的次數達到所指定的最大重覆次數時即終止。
  • 自定義的聚合收斂(Custom aggregator convergence):迭代中允許指定自定義的聚合運算子及收斂條件,當聚合運算子的輸出結果收斂到符合收斂條件時即終止。

以下為其虛擬碼:

IterationState state = getInitialState();
while (!terminationCriterion()) {
        state = step(state);}
setFinalState(state);

以下是一個對集合中所有數字進行增量的簡單範例:
Iterate Operator Example

  1. Iteration Input:初始輸入資料集從某資料來源讀取,包含5筆單一欄位的數值記錄,其欄位值為 1 至 5。
  2. Step Function:在此的 step function 中僅包含 map 這個操作,將輸入值加 1 。
  3. Next Partial Solution:而 step function 的輸出將會是 map 的輸出,亦即加 1 後的資料記錄。
  4. Iteration Result:在 10 次迭代後,初始資料集中所有數值記錄將被增量 10 次,產生 11 至 15 的結果。

欲使用批次迭代,我們只需要呼叫 DataSet 的 iterate(int) 這個成員方法,並指定一個 step function 即可。所指定的 step function 將會為當前的迭代取得輸入資料集,並傳回經 step function 作用後的新 DataSet。我們也可以使用 iterateWithTerminatioin(int) 這個成員方法,來指定一個回傳兩個新 DataSet 的 step function,一個為經 step function 作用後的新 DataSet,另一個為結束條件。一旦結束條件 DataSet 集合為空,則該迭代即刻停止。

以下是一個評估圓周率 Pi 的迭代範例程式,其目的在於計算落在單位圓內的亂數點數目。在每一次迭代中,會選擇一個亂數點,如果該點落在單位圓內,則將計數器加 1。最後,Pi 的估值為計數器的最終值除以總迭代次數乘以 4。

val env = ExecutionEnvironment.getExecutionEnvironment()
// Create initial DataSet
val initial = env.fromElements(0)
val count = initial.iterate(10000) { iterationInput: DataSet[Int] =>
  val result = iterationInput.map { i =>
    val x = Math.random()
    val y = Math.random()
    i + (if (x * x + y * y < 1) 1 else 0)
  }
  result
}
val result = count map { c => c / 10000.0 * 4 }
result.print()
env.execute("Iterative Pi Example");

差異性迭代(Delta Iterations)

在 Flink 中的差異性迭代操作,涵蓋了增量式迭代(incremental iterations)的概念,僅對資料集中需變更的記錄進行修改,而非對所有記錄重新計算。有些演算法並不會在每次迭代時,都對資料集中的每一筆記錄進行更新,我們可以將注意力放在那些 hot records(須要變更的資料記錄)上,而不去理會那些 cold records(不需變更的資料記錄)。一般而言,solution set 會冷得相當快,而後面的迭代中僅需操作資料集中的小子集。

Delta Iterate Operator

  1. Iteration Input:初始的 Workset 和 solution set,從某個資料來源或前面的運算結果中讀取。
  2. Step Function:在每次迭代中所要執行的函式,用以處理輸入資料集。取決於實際的需求,其中可能會包含 map、reduce、join 等操作。
  3. Next Workset/Update Solution Set:Next Workset 驅動迭代運算並將被作為下一個迭代的輸入;此外,Solution Set 將會被更新並自動傳遞給下個迭代使用。以上兩個資料集都可能被 step function 中不同運算子更新。
  4. Iteration Result:最後一次迭代所產生的結果,可以將之寫入 Data Sinks 中或用來作為後續程式中某些操作的輸入。

差異性迭代操作的預設終結條件,經由空工作集收斂條件(empty workset convergence criterion)及最大迭代次數(maximum number of iterations)來指定。當所產生的下一個工作集(next workset)為空或已達最大迭代次數時,該迭代操作都會終止。我們也可以指定自定義的聚合運算子及收斂條件,當聚合運算子的輸出結果收斂到符合收斂條件時即終止。

以下為其虛擬碼:

IterationState workset = getInitialState();
IterationState solution = getInitialSolution();
while (!terminationCriterion()) {
        (delta, workset) = step(workset, solution);

        solution.update(delta)}
setFinalState(solution);

以下是一個簡單的範例:

在這個範例中,每一個節點都有其 ID 及相對應的顏色,且會散播自己的 ID 給相鄰的節點。此範例的最終目的是要能指派最小的 ID 給一個子圖(subgraph)中的每一個節點。當所收到的 ID 比當前節點 ID 還小時,就將當前節點的背景色改為較小 ID 節點的背景顏色。上述範例演算法的應用有社交網路分析(social network analysis, SNA)中的社群偵測(community detection),或是計算 connected components。

Delta Iterate Operator Example

初始的輸入有 Workset 以及 solution set。上圖節點中所標注的背景顏色,視覺化 solution set 隨迭代演算法的演進過程。從圖中我們可以看到,最小 ID 節點的背景色,擴散至各自的子圖(subgraph)中。亦即,solution set 遞增;於此同時,Workset 的大小呈遞減(由 7 個節點遞減至 0 個)。

在圖中,上方子圖(節點 1~4)的最小 ID 節點為 1(橘色)。在第一次迭代中,與其相鄰的節點為 2,故將其 ID 1 傳遞給節點 2。這將會使得節點 2 變更其背景色為橘色,進而被收錄於 next Workset 中。對節點 3 和 4 而言,其相鄰節點為 2(黃色),故在這次迭代中,此兩節點的背景色都會被變更為黃色,並被收錄於 next Workset 中。由於節點 1 在本次迭代中沒有改變顏色,故不會被收錄在 next Workset 中。

而在下方子圖(節點 5~7)的最小 ID 節點為 5 (青綠色)。在第一次迭代中,與其相鄰的節點為 6,7,將會接收到 5。相同的,我們會略過 ID 5 這個沒有改變顏色的節點,不被收錄在 next Workset 中。因而,綜合上下兩子圖的結果,最終的 next Workset 中會包含節點 2,3,4,6,7。

在第二次迭代中,Workset 的大小已經從 7 個元素減少到 5 個元素(2,3,4,6,7)。下方子圖在這次迭代執行後已經收斂(沒有元素被包含在 Workset 中),亦即沒有資料會進行更新(cold part);反之,上方子圖尚需要更多的迭代(hot part)處理節點 3 及 4。最後,當第三次迭代執行完後,由於 Workset 為空集合,故停止演算法。

定義差異式迭代的方式與批次迭代類似,對於差異式迭代的每次迭代,都會輸入兩個 DataSet(Workset 和 Solution Set),也會輸出兩個 DataSet。欲使用差異式迭代,我們只需要呼叫初始 Solution Set 的 IterationDelta(initialWorkset, maxIterations, key) 成員函式。其中的 step function 接受兩個參數 (solutionSet, workset),且必須回傳兩個值:(solutionSetDelta, newWorkset)。以下是差異式迭代的一個簡單範例:

// read the initial data sets
val initialSolutionSet: DataSet[(Long, Double)] = // [...]
val initialWorkset: DataSet[(Long, Double)] = // [...]
val maxIterations = 100
val keyPosition = 0
val result = initialSolutionSet.iterateDelta(initialWorkset, maxIterations, Array(keyPosition)) {
  (solution, workset) =>
    val candidateUpdates = workset.groupBy(1).reduceGroup(new ComputeCandidateChanges())
    val deltas = candidateUpdates.join(solution).where(0).equalTo(0)(new CompareChangesToCurrent())

    val nextWorkset = deltas.filter(new FilterByThreshold())

    (deltas, nextWorkset)
}
result.writeAsCsv(outputPath)
env.execute()

Superstep Synchronization

如下圖所示,我們將 Flink 中迭代運算子中,每一次執行 step function 稱為一次迭代循環。在並行設置中,step functioin 可以於不同分區(partitions)中,以多個 step function 實例(instances)同時並行。在許多設定中,step function 在所有不同分區上的一次執行稱為一個 superstep,這也是同步的粒度(granularity of synchronization)。因此,在下一個 superstep 被初始化前,這一次迭代中所有並行任務需要完成該次的 superstep。結束條件也將會在 stepfunction barriers 進行評估。

Supersteps

 

廣告

發表迴響

在下方填入你的資料或按右方圖示以社群網站登入:

WordPress.com Logo

您的留言將使用 WordPress.com 帳號。 登出 / 變更 )

Twitter picture

您的留言將使用 Twitter 帳號。 登出 / 變更 )

Facebook照片

您的留言將使用 Facebook 帳號。 登出 / 變更 )

Google+ photo

您的留言將使用 Google+ 帳號。 登出 / 變更 )

連結到 %s