Flink DataSet API #2 – 資料集的輸入與輸出

上一篇文章中,以一個簡單的word count範例為出發點,介紹了一個Flink應用程式的基本構成要素。接下來這篇文章,則是要介紹如何從外部獲取欲進行處理的資料,以及相關的資料集輸出方式。對資料集轉化(transformation)的相關操作,則會放在後續的文章中說明。

在本文中,首先我們會簡單介紹在Flink中表示資料集的DataSet類別。接著介紹如何從資料來源取得資料集,以及相關的資料集輸出操作方法。

資料集的抽象化-DataSet類別

DataSet類別是Flink中用來表達一個有限的,不可變更的,元素可重覆的,且具有相同資料型態的資料集合。Flink在執行時期,會不會實際建立每一個DataSet物件的決定,取決於所使用的runtime,設定值和最佳化決策。由於Flink底層使用串流資料處理引擎,所以DataSet物件可以是執行過程中的streamed through操作。Flink會在以下兩種情況下,自動產生資料集相對應的DataSet物件:

  • 避免分散式死結(distributed deadlock)的發生(在資料流程圖中產生分支,之後又再合併回來時可能發生的停滯現象)。
  • 當執行模式被明確的設置為blocking execution。

資料類型

由於 Flink 在執行時期會分析資料集元素的型別,據以決定較有效率的執行策略;故 Flink 對於資料集裏面的元素類型有著一些限制。以下介紹 6 個 Flink 中支援的資料類型:

  1. Tuples 和 Scala Case Class
    Scala case classes 和 Scala tuples 可以包含固定數量,彼此不同型別的欄位。Scala tuple 的欄位可以透過以 1 為起始索引的方式來存取,如 _1 表示第一個欄位。Case class 的欄位則可以透過其欄位名稱直接存取。

    case class WordCount(word: String, count: Int)
    val input = env.fromElements(
        WordCount("hello", 1),
        WordCount("world", 2)) // Case Class Data Set
    input.groupBy("word").reduce(...) // group by field expression "word"
    val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
    input2.groupBy(0, 1).reduce(...) // group by field positions 0 and 1
    
  2. Java POJOs (Plain Old Java Objects)
    當 Java 和 Scala 的類別滿足以下條件時,都會被 Flink 視為特別的 POJO 資料型別:

    • 類別必須是 public。
    • 必須有一個無輸入參數的 public 建構子(預設建構子)。
    • 所有的欄位都要能被存取,故為 public 或具有 getter 及/或 setter 操作函式。對一個 foo 欄位而言,其相對應的 getter 及 setter 操作函式,必須被命名為 getfoo() 及 setfoo()。
    • 欄位的型別必須是 Flink 有支援的。目前,Flink 採用 Avro 對任意物件(如:Date 物件)進行序列化。

    Flink 會分析 POJO 所屬型別的結構,亦即會學習一個 POJO 的欄位。與一般型別相比,POJO 在使用上較為容易,且 Flink 在處理 POJO 時也較為有較率。以下是一個簡單的具有兩個 public 欄位的 POJO 範例:

    class WordWithCount(var word: String, var count: Int) {
      def this() {
        this(null, -1)
      }
    }
    

    當我們在進行分群,排序,或聯合POJO類型的資料集時,所欲採用的 keys 可以直接用欄位名稱來表示,例如:

    wordCounts groupBy { _.word } reduce(new MyReduceFunction())
    
  3. 原生型別(Primitives Types)
    Flink 支援所有 Java 和 Scala 的原生型別,如:Integer, String, Double 等。
  4. 一般類別(Regular Classes)
    Flink 支援大部份 Java 和 Scala API 中所提供或自定義的類別,但有限制-類別中不得包含已序列化的欄位,如:檔案指標(file pointers),I/O streams,或其他本地資源。此外,一般來說 Flink 也支援遵循 Java Bean 傳統的那些類別。
    所有被認為非 POJO 類型的類別,Flink 會將其視為一般類別來處理。Flink 將這些資料型別視為黑箱(black box),並且無法存取其內容(亦即為了有效率的排序),且採用 Kryo 對其進行序列化或反序列化。當我們在進行分群,排序,或聯合一般類別所構成的資料集時,所欲採用的 keys 必須以 key selector 函式來指定。
  5. Values
    Value 此類型,需自行描述如何對其進行序列化及反序列化,而不是採用某 framework 來完成。透過實現 org.apache.flinktypes.Value 介面中的 read 及 write 函式,它們為序列化及反序列化操作,提供了自定義的程式碼。當一般的序列化方法將會非常沒有效率時,使用 Value 類型的物件會是合理的。例如:一個以陣列(array)實現了稀疏向量(sparse vector)的資料型別。由於我們知道向量中大部份元素為 0,故可以透過 Value 類型,採用一個特殊的編碼方法對其進行壓縮;反之,一般的序列化方式則是很簡單的寫入陣列中的所有元素。
  6. Hadoop Writables
    我們可以使用實現了 org.apache.hadoop.Writable 介面的型別,定義於其中的 write() 和 readFields() 函式的序列化邏輯,將會被用來進行序列化。

取得資料集

ExecutionEnvironment具備幾個函式,可以讓我們從檔案載入欲被處理的資料,包括逐行讀取,讀取CSV檔,或讀取自定義的資料格式等。以下為逐行讀取的方式:

val env = ExecutionEnvironment.getExecutionEnvironment()
val text = env.readTextFile("file:///path/to/file")

以上的程式碼片斷,可以從一個指定的檔案獲取相對應的資料集(data set)。我們來看一下與檔案相關的讀取函式:

  • readTextFile(path):以字串型態逐行讀取檔案。
  • readTextFileWithValue(path):逐行讀取檔案,並回傳StringValues的物件。
  • readCsvFile(path):讀取CSV格式的檔案,回傳相對應的tuples,case class object,或POJOs。
  • readFileOfPrimitives(path, delimiter):用給定的分隔字串來解析檔案內容。
  • readHadoopFile(FileInputFormat, Key, Value, path):建立一個JobConf物件,並以指定的FileInputFormat, Key, Value從指定路徑讀取檔案。最後回傳Tuple2<Key, Value>物件。
  • readSequenceFile(Key, Value, path):建立一個JobConf物件,並以指定的SequenceFileInputFormat, Key class, Value class從指定路徑讀取檔案。最後回傳Tuple2<Key, Value>物件。

以下是與Collection相關的資料集建構函式:

  • fromCollection(Seq):從一個Seq物件建構資料集,其中所有的元素都必須具備相同型態。
  • fromCollection(Iteration):從一個Iteration物件建構資料集,元素的資料型態由Iterator回傳。
  • fromElements(elements: _*):從給定的一連串物件建構資料集,所有的物件都必須具備相同型態。
  • fromParallelCollection(SplittableIterator):從一個Iteration物件,並行的建構資料集。元素的資料型態由Iterator回傳。
  • generateSequence(from, to):以平行處理的方式,從from, to兩數字區間中建構資料集。

以下是一般性的資料集建構函式:

  • readFile(inputFormat, path):接受一個檔案輸入格式。
  • createInput(inputFormat):接受一個自定義的輸入格式。

以下是幾個範例:

val env = ExecutionEnvironment.getExecutionEnvironment
// read text file from local files system
val localLines = env.readTextFile("file:///path/to/my/textfile")

// read text file from a HDFS running at nnHost:nnPort
val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")

// read a CSV file with three fields
val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")

// read a CSV file with five fields, taking only two of them
val csvInput = env.readCsvFile[(String, Double)]("hdfs:///the/CSV/file",
includedFields = Array(0, 3)) // take the first and the fourth field

// CSV input can also be used with Case Classes
case class MyCaseClass(str: String, dbl: Double)
val csvInput = env.readCsvFile[MyCaseClass]("hdfs:///the/CSV/file",
includedFields = Array(0, 3)) // take the first and the fourth field

// read a CSV file with three fields into a POJO (Person) with corresponding fields
val csvInput = env.readCsvFile[Person]("hdfs:///the/CSV/file",
pojoFields = Array("name", "age", "zipcode"))

// create a set from some given elements
val values = env.fromElements("Foo", "bar", "foobar", "fubar")

// generate a number sequence
val numbers = env.generateSequence(1, 10000000);

// read a file from the specified path of type TextInputFormat
val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable],
classOf[Text], "hdfs://nnHost:nnPort/path/to/file")

// read a file from the specified path of type SequenceFileInputFormat
val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
"hdfs://nnHost:nnPort/path/to/file")

除了一般的文字檔外,Flink也可以讓我們讀取以下兩種壓縮檔。

  • deflate:附檔名為 .deflate 的壓縮檔。
  • GZip:附檔名為 .gz 或 .gzip 的壓縮檔。

要注意的是,壓縮檔可能無法進行平行讀取,在使用上須考慮是否會影響對 job 的可擴展性(scalability)。

資料集的輸出

一旦得到資料集後,我們便可以透過自行定義的轉化函式,對其進行一連串的資料轉化(transformation),產生程式所需的新資料集。之後可以進一步的將轉化結果存放在檔案中,亦或是與其他的資料集進行合併(combine)。對一個資料集進行map的轉化範例如下:

val input = DataSet[String] = ...
val mapped = input.map { x => x.toInt }

以上的程式片斷,會將資料集input中的每一個元素轉換為整數後,產生一個新的資料集。

當完成了必要的資料集轉化後,我們可以將其結果寫入到HDFS或本機的檔案中,也可以將之列印出來。Flink中有所謂的Data sink,用來保存資料集並將之以我們所指定的方式,進行相對應的輸出。Flink中內建許多輸出格式,封裝於DataSet類別的函式中,以下介紹幾種資料集的輸出方式:

  • 將資料集中的每一個元素,透過各自的toString()函式取得相對應的字串,逐行寫入檔案中。
    def writeAsText(path: String, writeMode: WriteMode = WriteMode.NO_OVERWRITE)
    def writeAsCsv(
    filePath: String,
    rowDelimiter: String = "\n",
    fieldDelimiter: String = ',',
    writeMode: WriteMode = WriteMode.NO_OVERWRITE)
    

    如函式名稱所示,writeAsText()將資料集以一般文字的格式寫入到指定檔案中;writeAsCsv()則是以CSV格式寫入。

  • 欲以自定義格式寫入檔案中,可以透過write函式。
    def write(outputFormat: FileOutputFormat[T],
    path: String,
    writeMode: WriteMode = WriteMode.NO_OVERWRITE)
    
  • 將資料集內容輸出到執行該Flink程式的Task Manager的標準輸出。此時,我們必須要在log檔裏才能找到所輸出的內容,例如:在cluster的環境中,會輸出到Task Manager的 .out 檔案中。要注意的是,此法並不會觸發程式的執行。
    def printOnTaskManager()
    
  • 將資料集內容輸出到standard output或standard error(於啟動Flink執行的JVM中)。要注意的是,Flink 0.9.x 版本之前,此方法會將內容輸出到worker的log檔中,之後的版本則會將資料集內容傳送到用戶端並列印之。
    def print()
    def printErr()
    
  • 從 cluster 中將資料集內容傳送到本機 JVM,形成一個 List 物作。
    def collect()
    

    以上 print() 及 collect() 兩個函式都會觸發程式的執行,故無須也不能再額外呼叫 execute() 函式。這也是為什麼在上一篇的 word count 範例中,並沒有看到 execute() 這個函式。因為在範例程式的最後一行呼叫了 print() 函式,進而觸發程式的執行。此外,此兩個函式都會將資料集內容傳送到用戶端,故不適用於過大的資料集。目前來說,我們所能用 collect() 所取得的資料量,受限於我們的 RPC 系統,故官方不建議將此函式用於取得大於 10MB 的資料集。

    對於最後沒有呼叫 print() 或 collect() 函式的程式,就必須呼叫 ExecuteEnvironment 的 execute() 函式。至於會在本機端或 cluster 中執行程式,則是取決於上一篇最後所提到的 ExecutionEnvironment 類型。

    execute() 函式會回傳 JobExecutionResult 物件,包含執行時間及 accumulator 的結果;print() 和 collect() 函式則不會回傳 accumulator 的結果,若有需要可以從 getLastJobExectuionResult() 函式取得。

廣告

2 thoughts on “Flink DataSet API #2 – 資料集的輸入與輸出

  1. 補充一個蠻有趣正在進行中的 JIRA issue:
    https://issues.apache.org/jira/browse/FLINK-2239

    如 George 大文中所提,Flink 0.9.x 版以後,print() 會把 DataSet 結果都傳回 application client 輸出而非 task 的 JVM,但對於很大的 DataSet 而言這一傳,client 很容易 OOM。其實這個問題跟對 Spark 很大的 RDD 下 collect 是一樣的意思?

    似乎在 Flink 1.x 版後,會把這個流程改成以串流的方式傳回 client 並且 incremental print(),也就是說實際上不會在 client 端把 DataSet 實體化再 print 避免掉 OOM 問題。這或許是 Flink 本身屬於 streaming dataflow runtime 的優點之一?

發表迴響

在下方填入你的資料或按右方圖示以社群網站登入:

WordPress.com Logo

您的留言將使用 WordPress.com 帳號。 登出 / 變更 )

Twitter picture

您的留言將使用 Twitter 帳號。 登出 / 變更 )

Facebook照片

您的留言將使用 Facebook 帳號。 登出 / 變更 )

Google+ photo

您的留言將使用 Google+ 帳號。 登出 / 變更 )

連結到 %s