Flink DataSet API #5 – 平行執行(Parallel Execution)

在這篇文章中,我們首先將介紹 Flink 中對其應用程式的執行配置,接著介紹如何設置應用程式的平行執行(parallel execution)及執行計畫(execution plans)。接著將介紹如何讓平行執行的 instances 共享變數;最後介紹執行計畫的視覺化工具。

執行設定(Execution Configuration)

先前在第一篇文章中曾提到,每一個 Flink 應用程式的基本構成要素中,必須取得一個 ExecutionEnvironment。在這個所取得的 ExecutionEnvironment 中,包含一個 ExecutionConfig 可以對執行時期的工作進行配置。這個 ExecutionConfig 可以如下取得:

val env = ExecutionEnvironment.getExecutionEnvironment
var executionConfig = env.getConfig

以下為可供設置的選項(粗體標示為預設配置選項):

  • enableClosureCleaner() / disableClosureCleaner():此選項預設為開啟。亦即在 Flink 應用程式執行時期,closure cleaner 會清除暱名函式不需要用到的 surrounding class 參照。當此選項關閉時,可能會發生以下情況:一個暱名的使用者自定義函式,參照一個通常不能序列化的 surrounding class,這將會導致 serializer 發生例外錯誤(exception)。
  • getParallelism() / setParallelism(int parallelism):設置預設的平行性;亦即,一個任務被切分為多少個分身平行執行。
  • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay):當一個工作發生錯誤後,系統應在重啟前等待多少毫秒(millisecond)。等待時間的計算,起始於 TaskManagers 成功停止所有該工作的任務。一旦等待間時到達,系統會重新啟動所有的任務。這個參數可以讓一些其他與 time-out 相關的錯誤,在試圖重新執行工作前顯現出來。此參數僅於執行重試次數大於或等於 1 時才會生效。
  • getExecutionMode() / setExecutionMode():預設的執行模式是 PIPELINED。設置應用程式的執行模式。執行模式定義了資料交換的方式為批次(batch)亦或是流水線(pipelined)。
  • enableForceKryo() / disableForceKryo():預設不採用 Kryo 進行序列化。強迫泛型資訊(generic type information)採用 Kryo 進行 POJOs 的序列化。在某些情況下,如:當 Flink 的 internal serializers 無法適當的處理 POJOs 時,可以設置此參數。
  • enableForceAvro() / disableForceAvro():預設不採用 Avro 進行序列化。強迫泛型資訊(generic type information)採用 Avro 而非 Kryo 進行 POJOs 的序列化。
  • enableObjectReuse() / disableObjectReuse():在 Flink 中預設不重用(reuse)物件。開啟物件重用模式,將會指示 Flink runtime 重用物件以追求性能。要注意的是,當操作子(operators)中使用者自定義的函式沒有意識到此模式已開啟,可能會導致一些 bugs。
  • enableSysoutLogging() / disableSysoutLogging():預設將 JobManager 的狀態更新列印至 System.out。這個行為可以經由此設置項關閉。
  • getGlobalJobParameters() / setGlobalJobParameters():此函式可以讓我們將自定義的物件,設置為 jobs 的全域設定。由於 ExecutionConfig 在所有使用者自定義函式中皆可存取,故此函式讓我們很容易的讓設置於 job 中得以全域存取。
  • addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer):為給定型態註冊一個預設的 Kryo seializer 物件。
  • addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass):為給定型態註冊一個預設的 Kryo seializer 類別。
  • registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer):為給定型態註冊採用 Kryo 進行序列化,並為其指定一個 serializer。如此可以使該型態的序列化更有效率。
  • registerKryoType(Class<?> type):如果某一型態最終會採用 Kryo 進行序列化,為其註冊將可確保僅 tags (整數值的 ID)會被寫入。否則,所有該型態的物件在序列化時,會將其整個類別名稱寫入,導致過高的 I/O 成本。
  • registerPojoType(Class<?> type):為給定型態的序列化註冊為採用 serialization stack。亦即,如果某一型態最終會採用 POJO 進行序列化,則為其註冊採用 POJO serializer。如果某一型態最終會採用 Kryo 進行序列化,則為其註冊採用 Kryo serializer。
  • disableAutoTypeRegistration():型態自動註冊功能預設是開啟的,用於為應用程式中所有用到的型態,註冊採用 Kryo 或 POJO serializer 進行序列化。

在所有以 Rich 為始的函式中,只要經由 getRuntimeContext() 函式取得 RuntimeContext,也可以使我們在自定義的函式中存取 ExecutionConfig。

平行執行(Parallel Execution)

在這節中,我們將介紹如何在 Flink 中,設置平行執行。一個 Flink 應用程式中,包含了多個任務,如:運算子(operators)、資料來源(data sources)、資料輸出(data sinks)等。一個任務被切分為多個分身平行執行,每個分身處理一小部份輸入到任務的資料。此時,我們稱一個任務的分身數量為其平行性(parallelism),且該平行性可以在 Flink 中以下四個不同層級來指定。

  • 運算子層級(operator level)
  • 執行環境層級(execution environment level)
  • 客戶端層級(client level)
  • 系統層級(system level)

運算子層級(Operator Level)

一個運算子、資料來源或資料輸出的平行性,都可以透過其 setParallelism() 成員函式來定義。例如:在第一篇文章的 WordCount 範例中,sum() 運算子的平行性可以如下被設置為 5。

val env = ExecutionEnvironment.getExecutionEnvironment
val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .groupBy(0)
    .sum(1).setParallelism(5)
wordCounts.print()
env.execute("Word Count Example")

執行環境層級(Execution Environment Level)

所有的 Flink 應用程式都在一個執行環境中執行,該環境會為它所執行的運算子、資料來源或資料輸出,定義預設的平行性。而這個平行性的設置可以透過以下方式覆寫:

val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .groupBy(0)
    .sum(1)wordCounts.print()
env.execute("Word Count Example")

客戶端層級(Client Level)

平行性也可以在客戶端將工作送出到 Flink 中執行時設置,該客戶端可以是 Java 或 Scala 應用程式。以下是一個 Flink CLI 用戶端的範例:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

而在一個 Scala 的應用程式中,平行性可以如下設置:

try {
    PackagedProgram program = new PackagedProgram(file, args)
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
    Configuration config = new Configuration()

    Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())

    // set the parallelism to 10 here
    client.run(program, 10, true)
} catch {
    case e: Exception => e.printStackTrace
}

系統層級(System Level)

若欲針對系統中所有執行環境,皆設置具有相同的平行性,可以在 conf/flink-conf.yaml 檔案中,設置 parallelism.default 屬性。設置檔細節可以參閱官網文件

變數廣播(Broadcast Variables)

一個運算子(operator)中的平行運算,除了能取得運算子的輸入資料集外,我們可以經由變數的廣播,讓一個資料集也能被它們取得。這個功能對於運算過程中,扮演輔助性質的資料集非常有用。將能使其在運算子中以 Collection 的形式被存取。

  • Broadcast:Broadcast 資料集可以經由 withBroadcastSet(DataSet, String) 函式,以名稱進行註冊。
  • Access:我們可以透過 getRuntimeContext().getBroadcastVariable(String) 函式,取得已註冊的 Broadcast 資料集。

以下是一個簡單的範例程式:

// 1. The DataSet to be broadcasted
val toBroadcast = env.fromElements(1, 2, 3)
val data = env.fromElements("a", "b")
data.map(new RichMapFunction[String, String]() {
    var broadcastSet: Traversable[String] = null

    override def open(config: Configuration): Unit = {
      // 3. Access the broadcasted DataSet as a Collection
      broadcastSet = getRuntimeContext().getBroadcastVariable[String]("broadcastSetName").asScala
    }

    def map(in: String): String = {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. Broadcast the DataSet

在使用上要特別注意的是,Broadcast 資料集會存在於節點的記憶體中,故不適合過大的資料集。而對於像是純量(scalar)資料,就不需要 Broadcast,應該作為函式參數的一部份,或使用 withParameters(…) 函式傳遞帶有純量資料的 Configuration 物件。

執行計劃(Execution Plan)

取決於不同的情況,如:資料量及叢集(cluster)大小,Flink 的優化器(optimizer)會據以自動為應用程式選擇一個執行策略。在許多情境下,知道 Flink 的執行策略會如何執行應用程式會非常有用。

視覺化工具(Plan Visualization Tool)

Flink 附帶了一個應用程式執行策略的視覺化工具,其視覺化 HTML 文件存放在 tools/planVisualizer.html 。它接受以 JSON 表示的工作執行計劃,將之以圖(graph)的形式視覺化呈現,並具有完整的執行策略標註。以下的程式碼,展示了如何輸出該應用程式執行策略相對的 JSON 文件。

val env = ExecutionEnvironment.getExecutionEnvironment
...
println(env.getExecutionPlan())

接下來,我們就可以將上述所輸出的 JSON 文件,透過以下步驟視覺化:

  1. 用瀏灠器載入 planVisualizer.html
  2. 複製輸出的 JSON 文件,貼於面頁上的文字方塊中(text field)
  3. 點擊 draw 按鈕

之後就可以看到如下圖的視覺化結果:

網頁界面(Web Interface)

在 Flink 中提供了一個 Web 界面,可以用來提交並執行工作。如果使用這個界面來提交打包好的應用程式,便可以選擇是否顯示相對應的執行策略視覺化呈現。

我們可以透過 bin/start-webclient.sh 來啟動這個界面,預設通訊埠為 8080。啟動後便可以透過該界面來提交 Flink 應用程式,並會在左側的可供執行應用程式列表中出現該應用程式。我們可以在頁面最下面的 textbox 中,輸入應用程式的執行參數。在執行前撰擇 plan visualization 這個 checkbox,便可以呈現視覺化的執行策略。

廣告

發表迴響

在下方填入你的資料或按右方圖示以社群網站登入:

WordPress.com Logo

您的留言將使用 WordPress.com 帳號。 登出 / 變更 )

Twitter picture

您的留言將使用 Twitter 帳號。 登出 / 變更 )

Facebook照片

您的留言將使用 Facebook 帳號。 登出 / 變更 )

Google+ photo

您的留言將使用 Google+ 帳號。 登出 / 變更 )

連結到 %s