Flink DataSet API #3 – 資料集的轉化(1)

上一篇文章中,我們介紹了 Flink DataSet API 中,如何生成資料集以及如何輸出資料集。在這一篇文章,我們將會介紹以下幾個圍繞在資料集轉化相關的主題上:

  • 資料集轉化操作(Transformation Operations):首先將介紹 Flink 中所提供資料轉化(data transformations)的相關操作。
  • 鍵的指定(Specifying Keys):資料的轉化可以將一或多個資料集,透過我們自行定義的轉化函式,轉化為另一個新的資料集。而在資料集的轉化操作中,對於分群(grouping)、排序(sorting)或結合(joining)等,會需要指明該操作所依據的鍵(key),我們會在稍後介紹指定鍵的相關議題。
  • 傳遞函式給于 Flink(Passing Functions to Flink):接下來,我們介紹如何將自行定義的轉化函式傳遞給 Flink 以利後續的執行。
  • 傳遞參數給于函式(Passing Paramteres to Functions):如何將相關參數傳遞給前述的自定義轉化函式。

轉化操作(Transformations)

  • Map:將自行定義的一對一映射函式,應用於資料集內的每一個元素,並回傳映射的結果。
    data.map { x => x.toInt }
    
  • FlatMap:與Map相似,不同點在於 flat-map 映射函式為一對多。亦即資料集內的每一個元素,經過 flat-map 函式的作用後,映射結果可能為空值(none)或任意多個元素。
    data.flatMap { str => str.split(' ') }
    
  • MapPartition:將自定義的函式,應用於資料集的每一個並行分區(parallel partition)。此函式適用於當無法各別轉化資料集中的個別元素,且無須針對元素進行分群時。若欲轉化資料集中的每一個元素,Map及 FlatMap 較適用。以下範例將data這個資料集的各並行分區中之元素,映射為Tuple (元素值, 1)。
    data.mapPartition { in => in map { (_, 1) }}
    
  • Filter:將自定義的過濾函式應用於資料集的每個元素,並保留函式回傳值為真的元素。
    data.filter { _ > 1000 }
    
  • Reduce:透過自定義的函式,將資料集中的元素進行兩兩合併,直到僅剩下一個元素為止。可以用於整個資料集或是分群的資料集。
    data.reduce { _ + _ }
    
  • ReduceGroup:透過自定義的函式,將資料集中同一群的元素合併為一或多個元素。ReduceGroup 可以被應用於整個資料集或一個已分群的資料集。
    data.reduceGroup { elements => elements.sum }
    
  • Aggregate:將一群數值聚合成為一個數值。我們可以將此處的聚合函式想像為內建的 reduce 函式,可以應用於整個資料集或一個已分群的資料集。
    val input: DataSet[(Int, String, Double)] = // [...]
    val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2)
    

    我們也可以針對最小值,最大值及加總這三個聚合方式,採用以下的速記語法:

    val input: DataSet[(Int, String, Double)] = // [...]
    val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)
    
  • Distinct:針對元素的所有或部份欄位,取得資料集中欄位值不重覆的元素。
    data.distinct()
    
  • Join:兩個資料集 join 成為一個,回傳所有指定鍵相等的元素對(element pairs)。欲指定 join 時所依據的鍵,則必須使用 where() 及 equalTo() 函式。若有需要,可以自行定義一個 JoinFunction 來將 join 後的元素對轉換為單一元素;或是定義一個 FlatJoinFunction 將元素對轉換為任意多個(包含 0 個)元素。
    // In this case tuple fields are used as keys. "0" is the join field on the first tuple
    // "1" is the join field on the second tuple.
    val result = input1.join(input2).where(0).equalTo(1)
    

    此外,我們也可以透過 Join Hints 來指定 Join 的執行策略,我們可以選擇 partitioned join 或是 broadcast join 以及是否要使用 sort-based 或 hash-based 演算法。若無指定 Join Hints,Flink將會試著去評估輸入資料集的大小,並據以挑選一個最佳的 Join 執行策略。

    // This executes a join by broadcasting the first data set
    // using a hash table for the broadcasted data
    val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
                       .where(0).equalTo(1)
    
  • OuterJoin:在兩個資料集上執行 left/right/full outer join。Outer join 與一般的(inner)join 相似,會建立鍵相等的元素對(pairs of elements)。此外,位於外部(outer)資料集(左側,右側或兩者)的資料記錄會被保留下來,即便在另一資料集中找不到與之相符的鍵。所給定的 JoinFunction 會負責尋找鍵相符的元素,並將該元素對轉化為單一元素;或由 FlatJoinFunction 將鍵相符的元素對,轉化為任意多個(包含 0 個)元素。
    val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
      (left, right) =>
        val a = if (left == null) "none" else left._1
        (a, right)
    }
    
  • CoGroup:我們可以將協同分組(CoGroup)視為是 reduce 操作應用於兩個資料集情況下的變形。亦即先將輸入資料集根據一或多個欄位,對其資料記錄進行分組,再對各組進行 join 的動作。
    data1.coGroup(data2).where(0).equalTo(1)
    
  • Cross:為兩個資料集建立笛卡爾積(Cartesian product),產生所有的元素對。我們也可以透過 CrossFunction 來將元素對進一步轉化為單一元素。
    val data1: DataSet[Int] = // [...]
    val data2: DataSet[String] = // [...]
    val result: DataSet[(Int, String)] = data1.cross(data2)
    

    注意:Cross 是計算密集(compute-intensive)的操作,即使對於擁有龐大運算資源的 cluster 也非常具有挑戰性。建議透過 crossWithTiny() 或 crossWithHuge() 函式通知 Flink 有關資料集的大小。

  • Union:產生兩資料集的聯集。
    data.union(data2)
    
  • Rebalance:讓一個資料集中的資料均勻分散於各分區中,以消除各分區在資料量上的偏態(skewness)。只有與 map 相似的轉化操作可以用於 rebalance 後的結果。
    val data1: DataSet[Int] = // [...]
    val result: DataSet[(Int, String)] = data1.rebalance().map(...)
    
  • Hash-Partition:根據所指定的鍵進行 hash 分區操作。鍵的指定可以透過所謂的 key-selector 函式、Tuple 位置,或 case class 中的欄位名稱。
    val in: DataSet[(Int, String)] = // [...]
    val result = in.partitionByHash(0).mapPartition { ... }
    
  • Range-Partition:根據所指定的鍵進行 range 分區操作。鍵的指定可以透過所謂的 key-selector 函式、Tuple 位置,或 case class 中的欄位名稱。
    val in: DataSet[(Int, String)] = // [...]
    val result = in.partitionByRange(0).mapPartition { ... }
    
  • Custom Partitioning:自行定義對資料集的切分方法,所指定的鍵僅能具有單一欄位。
    val in: DataSet[(Int, String)] = // [...]
    val result = in.partitionCustom(partitioner: Partitioner[K], key)
    
  • Sort Partition:在資料集的各分區中,以指定的欄位及排序方式,對分區中的資料進行排序。欄位的指定可以透過 Tuple 位置或欄位名稱的方式。若欲針對多個欄位進行排序,可以串接多個 sortPartition() 函式來達成。
    val in: DataSet[(Int, String)] = // [...]
    val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
    
  • First-n:回傳資料集中前 n 個元素。可以用於一般資料集、分組資料集或分組已排序資料集上。鍵的指定可以透過 key-selector 函式、Tuple 位置或 case class 中的欄位名稱。
    val in: DataSet[(Int, String)] = // [...]
    // regular data set
    val result1 = in.first(3)
    // grouped data set
    val result2 = in.groupBy(0).first(3)
    // grouped-sorted data set
    val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
    

以下的轉化操作可以在 Tuples 所構成的資料集上使用:

  • MinBy / MaxBy:從一群 Tuples 中挑選出一個 Tuple,該 Tuple 的一或多個欄位值具有極小(大)值。被用來進行比較的欄位必須具備可比性(comparable)。若同時有多個 Tuples 滿足極小(大)條件,則會隨機回傳其中一個 Tuple。MinBy(MaxBy)可以用於一般資料集或已分組資料集上。
    val in: DataSet[(Int, Double, String)] = // [...]
    // a data set with a single tuple with minimum values for the Int and String fields.
    val out: DataSet[(Int, Double, String)] = in.minBy(0, 2)
    // a data set with one tuple for each group with the minimum value for the Double field.
    val out2: DataSet[(Int, Double, String)] = in.groupBy(2)
                                                 .minBy(1)
    

指定轉化操作的鍵(Specifying Keys)

有些資料集轉化操作(join, coGroup, keyBy, groupBy 等),需要一個定義於資料集上的鍵(key);有些資料集轉化操作(Reduce, GroupReduce, Aggregate, Windows 等),則允許在操作執行前,該資料集已根據某鍵完成分組(grouping)。一個資料集可以透過以下方式完成分組:

val input = DataSet[...] // [...]
val reduced = input
        .groupBy(/*define key here*/)
        .reduceGroup(/*do something*/);

Flink 的 data model 並不是立基於鍵值對(key-value pair),所以我們並不需要真的將資料集包裝為具有鍵值的形態。Keys 是虛擬的:它們只是定義於實際資料上的函式,引導分組操作的進行。在這一節中,我們將介紹定義鍵的方式。

為 Tuple 資料指定鍵(Define Keys for Tuples)

最簡單的情況就是,根據一或多個欄位,為 Tuple 所構成的資料集進行資料分組。以下是一個範例,輸入資料集根據其 Tuple 形式的資料中的第一個欄位進行分組,隨後的 GroupReduce 函式將會收到分組的結果,同一組 Tuples 的第一個欄位值皆相同。

val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input
  .groupBy(0)
  .reduceGroup(/*do something*/)

以下是另一個複合鍵(composite keys)的範例,示範根據前兩個欄位進行分組的狀況。

val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input
  .groupBy(0,1)
  .reduce(/*do something*/)

注意:當資料集是由如下的巢狀式 tuples 形式所構成時

val ds: DataSet[((Int, Float), String, Long)]

groupBy(0) 將會以整個 (Int, Float) 為鍵,若想要取得該鍵中的資訊,我們就必須使用欄位名稱做為鍵。

使用欄位表達式做為鍵(Define Keys Using Field Expressions)

我們可以使用欄位表達式來參照巢狀式欄位,並定義鍵以進行 grouping、sorting、joining 或 coGrouping。欄位表達式可以讓我們很方便的在 Tuple 或 POJO 類型的巢狀複合式物件中選擇欄位;此外,欄位表達式可以被用來定義語義函式標注(semantic function annotations)。
在以下的範例中,WC 是一個具有兩個欄位(word 和 count)的 POJO。欲依據 word 欄位進行分組,我們僅需將欄位名稱給于 groupBy() 函式即可。

// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
  def this() { this("", 0L) }
}
val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word").reduce(/*do something*/)
// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word").reduce(/*do something*/)

欄位表達式語法:

  • 用欄位名稱選擇 POJO 類型的欄位。如上例中的 “word"。
  • 用欄位名稱(以 1 為起始值)或索引值(以 0 為起始值)選擇 Tuple 中的欄位。例如:_1 和 5 分別代表 Tuple 中的第一個和第六個欄位。
  • 選擇位於 Tuple 或 POJO 中的巢狀式欄位。例如:user.zip 代表 user 這個 POJO 中的 zip 欄位。此外,Flink 也支持選擇 Tuple 與 POJO 的混合巢狀式欄位。例如:"_2.user.zip" 或 “user._4.1.zip"
  • 用萬用符號("_")代表所有型態,可以用於非 Tuple 及 POJO 以外的型態。

我們來看以下這個較完整的範例,並據以示範幾個欄位表達式的例子:

class WC(var complex: ComplexNestedClass, var count: Int) {
  def this() { this(null, 0) }
}
class ComplexNestedClass(
    var someNumber: Int,
    someFloat: Float,
    word: (Long, Long, String),
    hadoopCitizen: IntWritable) {
  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}

對上例而言,以下這些欄位表達式皆合法:

  • count:在 WC 中的 count 欄位。
  • complex:遞迴的選擇具有 ComplexNestedClass 型態的 complex 物件中,所有的欄位。
  • complex.word._3:選擇巢狀式 Tuple3 中最後一個欄位。
  • complex.hadoopCitizen:選擇 Hadoop 的 IntWritable 類型。

透過鍵選擇器函式(key selector function)定義鍵

另外一個定義鍵的方式是透過鍵選擇器函式,這類函式接受資料集的一筆資料做為輸入參數,並回傳該資料的鍵。該鍵可以是任何型態,以及從任意運算所產生。以下是一個鍵選擇器函式的範例,該函式很簡單的回傳一個物件的欄位。

// some ordinary case class
case class WC(word: String, count: Int)
val words: DataSet[WC] = // [...]
val wordCounts = words
  .groupBy( _.word ).reduce(/*do something*/)

將自訂義函式傳遞給 Flink

在進行資料集轉化操作時,我們可能會需要自行定義函式,並將其傳遞給 Flink。這一節中介紹幾個達成該目的的方式。

  • Lambda 函式:如同先前的幾個範例中所示,所有的轉化操作都可接受 Lambda 函式。
    val data: DataSet[String] = // [...]
    data.filter { _.startsWith("http://") }
    
    val data: DataSet[Int] = // [...]
    data.reduce { (i1,i2) => i1 + i2 }
    // or
    data.reduce { _ + _ }
    
  • Rich 函式:所有可接受 Lambda 函式做為輸入參數的轉化操作,都可以改用 Rich 函式做為輸入參數。例如:
    data.map { x => x.toInt }
    

    我們可以將上例用以下 Rich 函式改寫:

    class MyMapFunction extends RichMapFunction[String, Int] {
      def map(in: String):Int = { in.toInt }
    }
    
    data.map(new MyMapFunction())
    

    此外, Rich 函式也可以採用如下匿名函式的方法來定義:

    data.map (new RichMapFunction[String, Int] {
      def map(in: String):Int = { in.toInt }}
    )
    

除了使用者可以自行定義的函式(map、reduce等)外,Rich 函式額外提供了四個函式:open、close、getRuntimeContext 以及 setRuntimeContext。以下簡述其各自的功用:

  • open:可視為 Rich 函式的 constructor,用於初始化,預設為空函式。會在操作函式(如:map 或 join)開始運作之前被呼叫,故適合將一次性的初始化工作放置於此。若 Rich 函式被包含在迴圈中,則此函式將在每次迭代開始時被觸發。
  • close:可視為 Rich 函式的 distructor,用於 Rich 函式結束前的清理工作。會在操作函式(如:map 或 join)運作結束後被呼叫。若 Rich 函式被包含在迴圈中,則此函式將在每次迭代結束前被觸發。
  • getRuntimeContext:可取得包含 UDF runtime 資訊(如:Accumulators及DistributedCache等)的 context 物件。
  • setRuntimeContext:可設置 Rich 函式的 runtime context,當 framework 欲建立並行的 Rich 函式時會呼叫此函式。
廣告

發表迴響

在下方填入你的資料或按右方圖示以社群網站登入:

WordPress.com Logo

您的留言將使用 WordPress.com 帳號。 登出 / 變更 )

Twitter picture

您的留言將使用 Twitter 帳號。 登出 / 變更 )

Facebook照片

您的留言將使用 Facebook 帳號。 登出 / 變更 )

Google+ photo

您的留言將使用 Google+ 帳號。 登出 / 變更 )

連結到 %s