Flink DataSet API #3 – 資料集的轉化(2)

在這篇文章中,我們將接續上一篇文章,介紹以下主題:

  • 傳遞參數給于函式(Passing Paramters to Functions):如何將相關參數傳遞給前述的自定義轉化函式。
  • 語義標注(Semantic Annotations):適當的為資料轉化操作使用語義標注,將可以使其節省一些不必要的資料移動或排序,進而提昇執行效率。我們將會介紹如何在 Flink 中使用語義標注。
  • 累加器及計數器(Accumulators and Counters):累加器(accumulators)是一個具有加法運算子和最終累加結果的簡單構造,可以在應用程式中的工作結束後,取得工作結果。對除錯(debug)及資料的初期觀察以發現進一步資訊時非常有用。
  • 除錯(Debugging):我們將介紹幾個在 Flink 中,能有效簡化資料分析應用程式開發的除錯方法。

傳遞參數給于函式(Passing Parameters to Functions)

要將參數傳遞給一個函式物件,可以經由該函式物件的建構式(constructor),或是經由 withParameters(Configuratioin) 函式。這些參數會被序列化(serialize)成為函式物件的一部份,並傳送給所有的並行實例(parallel task instance)。

  • 經由建構式
val toFilter = env.fromElements(1, 2, 3)
toFilter.filter(new MyFilter(2))
class MyFilter(limit: Int) extends FilterFunction[Int] {
  override def filter(value: Int): Boolean = {
    value > limit
  }
}
  • 經由 withParameters(Configuration)

這個函式接受一個 Configuration 物件做為參數,此參數將會被傳遞給 Rich 函式的 open() 函式。Configuration 物件實質上是一個以字串為鍵,任意型態為值的映射。以下是一個 filter 的範例:

val toFilter = env.fromElements(1, 2, 3)
val conf = new Configuration()
conf.setInteger("limit", 2)
toFilter.filter(new RichFilterFunction[Int]() {
    var limit = 0

    override def open(config: Configuration): Unit = {
      limit = config.getInteger("limit", 0)
    }

    def filter(in: Int): Boolean = {
        in > limit
    }
}).withParameters(conf)
  • 全域參數

Flink 也允許我們傳遞自定義的 configuration 值,給于程式執行環境中的 ExecutionConfig 介面。由於 ExecutionConfig 可供所有 Rich 函式存取,故所有函式都能存取其中的自定義 configuration 值。
首先,我們必須先設置自定義的 configuration:

val env = ExecutionEnvironment.getExecutionEnvironment
val conf = new Configuration()
conf.setString("mykey", "myvalue")
env.getConfig.setGlobalJobParameters(conf)

除了以上方法,也可以自行定義一個繼承自 ExecutioinConfig.GlobalJobParameters 類別,並將之做為全域工作參數(global job parameters)傳遞給 ExecutionConfig。我們也可以實現其中的 Map<String, String> toMap() 成員方法,它可以用來在 Web 前端顯示使用者設置值。
接著是從全域設置中取出所需的值。位於全域工作參數中的物件,在系統的許多地方都可以存取。而且,所有實現 Rich∗Function 的使用者定義函式,都可透過 runtime context 存取它們。

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    private String mykey;
    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
      Configuration globConf = (Configuration) globalParams;
      mykey = globConf.getString("mykey", null);
    }
    // ... more here ...

語義標注(Semantic Annotations)

語義標注可以用來給于 Flink 關於一個函式的行為提示。它們告訴系統,哪些輸入參數是欲讀取並進行運算的輸入,哪些輸入參數不經修改直接輸出。語義標注對於執行的加速是非常有力的工具,因為它們允許系統推論在多個運算(operations)之間,重覆使用排序次序(sort orders)或分區(partitions)。換句話說,在 Flink 應用程式中適當的使用語義標注,將可以使其節省一些不必要的資料移動或排序,進而提昇執行效率。

我們可以選擇是否要使用語義標注,要注意的是,在使用語義標注時,必須要非常小心且趨於保守。不正確或不適當的語義標注,將會使系統對應用程式基於不正確的假設,做出錯誤的推理,最終很有可能會導致錯誤的執行結果。因此,當一個運算子(operator)的行為無法被預期時,就不應該對其使用語義標注。

目前 Flink 中支持以下三種語義標注:

  • 轉發欄位標注(Forwarded Fields Annotations)
  • 非轉發欄位標注(Non-forwarded Fields Annotations)
  • 讀取欄位標注(Read Fields Annotations)

轉發欄位標注(Forwarded Fields Annotations)

所謂的轉發欄位,為一個函式的輸入欄位,該欄位未經改變而被轉發做為此函式輸出的一部份。此資訊被優化器(optimizer)用來推論一個資料屬性(如:排序、分區)是否被一個函式所保留。

轉發欄位的資訊是使用欄位表達式來指定的。而將被轉發至函式輸出中的相同位置(position)時,可以採用位置來指定,且所指定的位置,在輸入及輸出的資料型態(data type)中必須是有效位置,並具有相同型態。例如:"f2″ 這個字串,代表了一個 Java tuple 輸入的第三個欄位,轉發至 Java tuple 輸出的第三個欄位。

以上簡要說明了輸入與輸出位置相同的欄位轉發。若要進行相異位置的欄位轉發,則必須指定於輸入中的來源欄位及輸出中的目標欄位,以作為欄位表達式。例如:"f0->f2″ 這個欄位表達式,表示將 Java tuple 輸入的第一個欄位,轉發至 Java tuple 輸出的第三個欄位。而萬用字元 “∗" 可以用來表達全體輸入或輸出,例如:"f0->∗" 代表僅輸出 Java tuple 輸入的第一個欄位。

若欲轉發多個指定欄位時,我們可以將各個轉發欄位表達式以分號進行分隔,例如:"f0; f2->f1; f3->f2″;或將之拆分為多個欄位表達式,如:"f0″, “f2->f1″, “f3->f2″。

欄位轉發資訊可以透過以下兩種方式來宣告:

  • 函式類別標注(Function Class Annotations)
    • @ForwardedFields:用於僅有單一輸入參數的函式,如:Map、Reduce 等。
    • @ForwardedFieldsFirst:轉發第一個欄位。用於有兩個輸入參數的函式,如:Join、CoGroup 等。
    • @ForwardedFieldsSecond:轉發第二個欄位。用於有兩個輸入參數的函式,如:Join、CoGroup 等。
  • 作為參數傳遞(Operator Arguments)
    • data.map(myMapFunc).withForwardedFields():用於僅有單一輸入參數的函式,如:Map、Reduce 等。
    • data1.join(data2).where().equalTo().withForwardedFieldsFirst():轉發第一個欄位。用於有兩個輸入參數的函式,如:Join、CoGroup 等。
    • data1.join(data2).where().equalTo().withForwardedFieldsSecond():轉發第二個欄位。用於有兩個輸入參數的函式,如:Join、CoGroup 等。

要特別注意的是,若我們以類別標注的方式宣告欄位轉發資訊,便無法以參數傳遞的方式將欄位轉發資訊覆寫(overwrite)。

以下的簡單範例展示了以函式類別標注方式,宣告欄位轉發資訊:

@ForwardedFields("_1->_3")
class MyMap extends MapFunction[(Int, Int), (String, Int, Int)]{
   def map(value: (Int, Int)): (String, Int, Int) = {
    return ("foo", value._2 / 2, value._1)
  }
}

非轉發欄位標注(Non-forwarded Fields Annotations)

非轉發欄位資訊說明了哪些欄位不會被保留在一個函式輸出中的相同位置上;同時,所有其餘欄位的值都將被認為必須被留在輸出中的相同位置上。因此,非轉發欄位標注是轉發欄位標注的反向思維。要特別注意的是,一旦使用了非轉發欄位標注,必須確保所有的非轉發欄位都已經宣告。因為所有其他的欄位都將被視為輸入與輸出相同位置的轉發欄位。

與轉發欄位的資訊相同,非轉發欄位的資訊是使用欄位表達式來指定的。我們可以將各個非轉發欄位表達式以分號進行分隔,例如:"f1; f3″;或將之拆分為多個欄位表達式,如:"f1″, “f3″,表示除了 Java tuple 輸入中的第二及第四個欄位外,其餘的欄位都被保留在 Java tuple 輸出中的相同位置上。此外,我們只可以為輸入與輸出具有完全相同型態的那些函式,宣告非轉發欄位標注。

非轉發欄位標注可以透過以下幾個方式指定:

  • @NonForwardedFields:用於僅有單一輸入參數的函式,如:Map、Reduce 等。
  • @NonForwardedFieldsFirst:第一個欄位為非轉發欄位。用於有兩個輸入參數的函式,如:Join、CoGroup 等。
  • @NonForwardedFieldsSecond:第二個欄位為非轉發欄位。用於有兩個輸入參數的函式,如:Join、CoGroup 等。

以下的簡單範例展示了宣告非轉發欄位資訊:

@NonForwardedFields("_2") // second field is not forwarded
class MyMap extends MapFunction[(Int, Int), (Int, Int)]{
  def map(value: (Int, Int)): (Int, Int) = {
    return (value._1, value._2 / 2)
  }
}

讀取欄位標注(Read Fields Annotations)

我們透過讀取欄位標注,宣告會被函式存取、估值或參與計算的那些欄位。要特別注意的是,一旦使用了讀取欄位標注,必須確保所有的讀取欄位都已經宣告。我們可以將各個讀取欄位表達式以分號進行分隔,例如:"f1; f3″;或將之拆分為多個欄位表達式,如:"f1″, “f3″,表示 Java tuple 輸入中的第二及第四個欄位,將在函式中被讀取並估值。此外,我們只可以為輸入與輸出具有完全相同型態的那些函式,宣告非轉發欄位標注。

讀取欄位標注可以透過以下幾個方式指定:

  • @ReadFields:用於僅有單一輸入參數的函式,如:Map、Reduce 等。
  • @ReadFieldsFirst:第一個欄位為讀取欄位。用於有兩個輸入參數的函式,如:Join、CoGroup 等。
  • @ReadFieldsSecond:第二個欄位為讀取欄位。用於有兩個輸入參數的函式,如:Join、CoGroup 等。

以下的簡單範例展示了宣告讀取欄位資訊:

@ReadFields("_1; _4") // _1 and _4 are read and evaluated by the function.
class MyMap extends MapFunction[(Int, Int, Int, Int), (Int, Int)]{
   def map(value: (Int, Int, Int, Int)): (Int, Int) = {
    if (value._1 == 42) {
      return (value._1, value._2)
    } else {
      return (value._4 + 10, value._2)
    }
  }
}

累加器及計數器(Accumulators and Counters)

累加器(accumulators)是一個具有加法運算子和最終累加結果的簡單構造,可以在應用程式中的工作結束後,取得工作結果。一個最直覺的累加器就是計數器(counter),我們可以使用 Accumulator.add(V value) 來進行累加。在工作的尾聲,Flink 會加總/合併所有的局部結果(partial results),生成最終結果並傳送給用戶端(client)。累加器在除錯(debug)及資料的初期觀察以發現進一步資訊時非常有用。

Flink 目前有以下幾個內建的累加器,其中每一個都實作了 Accumulator 這個界面(interface):

  • IntCounter、LongCounter 以及 DoubleCounter:請參閱後續累加器的使用說明及範例。
  • Histogram:Histogram 累加器以分佈(distribution)的手法,產生構成直方圖(histogram)的資訊。我們可以使用它來計算數值的分佈(distributions of values),例如:在一個 word count 的應用程式中,計算行字數(words-per-line)的分佈。

如何使用累加器(How to use accumulators)

欲使用累加器,可以依循以下步驟:

  1. 創建一個累加器物件:在我們想要使用累加器的運算子函式中,創建一個適當的累加器物件(以下範例為 counter)。其運算子函式指的是,以(匿名)類別實作運算子的使用者自定義函式,請參閱傳遞函式給于 Flink 一節。
    private IntCounter numLines = new IntCounter();
    
  2. 註冊所創建的累加器物件:在創建累加器物件後,我們必須為該累加器命名,並於 RuntimeContext 中以指定名稱進行註冊。一般而言,註冊動作通常放在運算子函式的 open() 中。
    getRuntimeContext().addAccumulator("num-Lines", this.numLines);
    
  3. 使用累加器:在完成上述的註冊後,我們就可以在運算子函式的任何地方,包括 open() 及 close() 中,使用該累加器。
    this.numLines.add(1)
    
  4. 取得累加器中的最終結果:最終的總體結果會被存放在 JobExecutionResult 物件中,該物件會在使用 Java API 運行工作(job)時被回傳,且目前僅於會等待工作完成的那些情境下能運作。
    myJobExecutionResult.getAccumulatorResult("num-Lines")
    

每一個工作(job)中的所有累加器,共享一個命名空間(namespace)。因此,我們可以在一個工作的不同運算子函式中,使用相同(名稱)的累加器,而 Flink 最後會將所有同名累加器的結果進行合併。在這裏要注意的是,截至目前為止,累加器的最終結果只能在工作結束後才能取得。而 Flink 開發團隊也有規劃,讓前一次迭代所衍生的結果,能在下一次迭代中取得。我們可以使用聚合器(Aggregators)來計算各次迭代的統計量,並基於這些統計量得到最終迭代結果。

自定義累加器(Custom Accumulators)

如前所述,Flink 內建的累加器都實作了 Accumulator 這個界面,欲自行定義累加器亦然。我們可以選擇實作以下兩個類別之一:Accumulator 或 SimpleAccumulator。其中,Accumulator<V, R> 是最有彈性的,V 代表 Value、R 代表 Result,它為欲進行累加的值定義了一個型態 V,並為其結果定義了一個型態 R,且 V 和 R 的型態未必相同。例如:對一個 histogram 而言,V 是一個數字,R 則是一個 histogram。反之,SimpleAccumulator 顧名思義是一個簡單的累加器,也是 Accumulator 的一個特例,專為 V 和 R 具備相同型態的情境所使用,例如:計數器。

除錯(Debugging)

在一個分散式 cluster 中執行分析程式處理大資料集之前,我們最好先一步確認所實現的演算法能如預期般的運作。因此,實作資料分析應用程式通常採用迭代漸進的方式,其可能包含檢查結果、除錯、改善等步驟。這一節中,我們將介紹幾個在 Flink 中,能有效簡化資料分析應用程式開發的除錯方法。

本地執行環境(Local Execution Environment)

一個 LocalEnvironment 能在其所在的 JVM 中啟動一個 Flink 系統。如果我們在 IDE 中啟動 LocalEnvironment,就可以在應用程式原始碼中設置中斷點(breakpoints),以利進行除錯。以下是建立 LocalEnvironment 的範例:

env = ExecutionEnvironment.createLocalEnvironment()
val lines = env.readTextFile(pathToFile)
// build your program

env.execute();

Collection 的資料來源及接收器(Collection Data Sources and Sinks)

為一個分析程式建立檔案提供輸入,並讀取檔案檢查其輸出,有時是很難處理的。Flink 提供了特殊的 data sources 和 data sinks,支持 Java Collections 來簡化測試的過程。一旦應用程式通過測試後,我們可以很容易的將 data sources 和 sinks,取代為實際應用情境下所使用的 data sources 和 sinks,例如:HDFS。以下是一個簡單的範例:

val env = ExecutionEnvironment.createLocalEnvironment()

// Create a DataSet from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataSet from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)

// Create a DataSet from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注意:就當下而言,Collection 的資料來源必須實作 Serializable,而且無法支持平行執行(即平行性為 1)。

廣告

發表迴響

在下方填入你的資料或按右方圖示以社群網站登入:

WordPress.com Logo

您的留言將使用 WordPress.com 帳號。 登出 / 變更 )

Twitter picture

您的留言將使用 Twitter 帳號。 登出 / 變更 )

Facebook照片

您的留言將使用 Facebook 帳號。 登出 / 變更 )

Google+ photo

您的留言將使用 Google+ 帳號。 登出 / 變更 )

連結到 %s