Flink DataSet API #1 – 第一個範例

以下是一個簡單的word count範例程式

import org.apache.flink.api.scala._
object WordCount {
  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()
  }
}

當我們欲使用Flink中的Scala API,必須包含以下其中之一的import:

import org.apache.flink.api.scala._

import org.apache.flink.api.scala.createTypeInformation

其原因是,Flink會對程式中所使用的型態(type)進行分析,為之產生相對應的序列器(serializer)及比較器(comparator)。透過以上的import,我們引入了隱式轉換(implicit conversion)來為Flink的運算子建立與型態的相關資訊。

如上word count範例所示,每一個Flink程式有以下幾個基本的構成要素:

  • 取得一個ExecutionEnvironment。
  • 載入或建立欲被處理的資料。
  • 對該資料進行一系列的轉化(transformation)。
  • 將運算的結果進行儲存。
  • 觸發程式的執行。

所有Scala API的核心類別都可以在org.apache.flink.api.scala中找到。

ExecutionEnvironment是所有Flink程式的基礎,我們可以透過以下在ExecutionEnvironment類別中所定義的靜態函式取得相對應的物件:

def getExecutionEnvironment
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors())
def createLocalEnvironment(customConfiguration: Configuration)
def createCollectionsEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*)

一般而言,我們僅需要使用getExecutionEnvironment()來取得ExecutionEnvironment的物件即可。因為它將會視以下不同情境,來決定要準備一個什麼樣的執行環境。

  • 當我們在IDE中或在一般環境中執行Flink程式時,ExecutionEnvironment將會於我們的電腦中建立一個local環境好讓程式在其中執行。
  • 當我們從Flink程式建立了一個JAR檔,經由command line或web interface執行它時,Flink cluster manager將會執行主函式(main function),而位於其中的getExecutionEnvironment()會回傳一個能在cluster上執行的環境。
廣告

發表迴響

在下方填入你的資料或按右方圖示以社群網站登入:

WordPress.com Logo

您的留言將使用 WordPress.com 帳號。 登出 / 變更 )

Twitter picture

您的留言將使用 Twitter 帳號。 登出 / 變更 )

Facebook照片

您的留言將使用 Facebook 帳號。 登出 / 變更 )

Google+ photo

您的留言將使用 Google+ 帳號。 登出 / 變更 )

連結到 %s