「Flink Forward 2015 重點整理」:DataStream API Hands-On #1

 

 

此議程為 Flink Forward 2015 的 Flink Training Session 第一堂議程,主要介紹透過 Flink DataStream API 撰寫即時串流運算的叢集程式,適合的對象為幾乎沒有寫過其他例如 Storm、MapReduce 或者 Spark 等分散式運算框架應用程式的初學者。對於已經有類似經驗的人,可以直接快速瀏覽 1、2、4、6 即可,這幾個地方跟其他運算框架有些微的不同。

 

教學範例完整程式:Socket Incremental Wordcount


整個教學過程環繞常見的 WordCount 範例應用,從 socket 文字資料串流即使的計算每個字的出現次數,以每5秒為 aggregation window。本文將一一條列影片中講者所提到的重點。

以下為 main method 完整程式碼:

public static void main(String[] args) throws Exception {
    // set up the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String,Integer>> counts = env
        // read stream of words from socket
        .socketTextStream(localhost, "9999")
        // split up the lines in tuple containing
        .flatMap(new Splitter())
        // group by the tuple field "0"
        .groupBy(0)
        // keep the last 5 minutes of data
        .window(Time.of(5, TimeUnit.Minutes))
        // sum up tuple field "1"
        .sum(1);

    // print results in command line
    counts.print()
    // execute program
    env.execute("Socket Incremental Wordcount Example")
}

其中,Splitter 類別屬於使用者自行定義的 User Function,下文將會有更詳細的介紹。影片中沒有 Splitter 的範例程式,但大致上的程式碼為以下:

public class Splitter implements FlatMapFunction<String,Tuple2<String,Integer>> {
    @Override
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
        for(String word : s.split("\\w+"))
            collector.collect(new Tuple2(word,1));
    }

 

重點 #1:透過 API 獲取 StreamExecutionEnvironment 物件(影片 02:15 處)


public static void main(String[] args) throws Exception {

    // set up the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ...
}

每一個串流運算程式開頭都必須透過 API 獲得一個 StreamExecutionEnvironment 物件。此物件在程式中代表著「與實體 Flink 叢集的連線」,所有需要對實體叢集下達的指令都必須透過該物件設定,包含設定資料串流的源頭(Data Source)與終點(Data Sink)、開始執行串流程式等。

StreamExecutionEnvironment 物件獲得得方式為透過 StreamExecutionEnvironment 類別的 getExecutionEnvironment 靜態方法。依據此處所述,getExecutionEnvironment() 會依據當下程式執行之環境(standalone、IDE、實體叢集)回傳相對應的 StreamExecutionEnvironment。若程式執行於 standalone 模式或者於 IDE 中執行,則回傳的為 local environment,而若將程式封包於 JAR 擋並提交給實體的 Flink 叢集執行,則為回傳 remote environment。亦可透過 createLocalEnvironment() 或 createRemoteEnvironment() 來指定獲得 StreamExecutionEnvironment,否則直接透過 getExecutionEnvironment() 即可。

 

重點 #2:透過 StreamExecutionEnvironment 加資料串流源頭(影片 03:03 處)


public static void main(String[] args) throws Exception {
    ...

    DataStream<Tuple2<String,Integer>> counts = env
        // read stream of words from socket
        .socketTextStream(localhost, "9999")
        // split up the lines in tuple containing
        .flatMap(new Splitter())
        // group by the tuple field "0"
        .groupBy(0)
        // keep the last 5 minutes of data
        .window(Time.of(5, TimeUnit.Minutes))
        // sum up tuple field "1"
        .sum(1);

    ...
}

透過 env 我們可以設定整個運算的資料串流源(Data Source),Flink支援各種串流資料源,包含簡單的字串或者 socket ,或者透過 Flink Stream Connector Library 亦可串接 Apache KafkaRabbitMQ 等 message buffer。

影片中的這個範例則是使用了基本的 socket 串流資料源,接收 localhost:9999 的 socket 的純文字串流,而我們可以在 terminal 中透過 ‘nc -l 9999’ 指令來開啟此socket 後輸入文字來模擬此資料串流。

值得一提的是,在 Flink 程式定義資料串流源之後,並不會真的開始讀取串流源的資料,而是等到 StreamExecutionEnvironment.execute() 被呼叫後才會真正開始讀取資料,原因是 Flink 應用程式都屬於 lazy computation,程式中會連續的定義對資料的多次「轉變(transformation)」後,一直到有第一次的真正運算結果指令才會把之前累積的所有轉變規劃成實體叢集的運算工作並且實際開始執行。這個概念與 Apache Spark 的 RDD DAG 行成與 lazy computation 十分相似(兩者對於 lazy computation 處理方式之差異,可以參考這篇討論串)。

 

重點 #3:對資料串流源定義一系列的 Transformation(影片 04:18 處)


public static void main(String[] args) throws Exception {
    ...

    DataStream<Tuple2<String,Integer>> counts = env
        // read stream of words from socket
        .socketTextStream(localhost, "9999")
        // split up the lines in tuple containing
        .flatMap(new Splitter())
        // group by the tuple field "0"
        .groupBy(0)
        // keep the last 5 minutes of data
        .window(Time.of(5, TimeUnit.Minutes))
        // sum up tuple field "1"
        .sum(1);

    ...
}

上方這段程式碼為資料串流源接續的一連串「轉變(transformation)」的定義,包含flatMap、groupBy、window、sum 等。如前段所述,這些轉變實際上在呼叫時不會有實際的運算執行,而是會形成一個由 JobGraph 代表著得執行計畫(execution plan),待 StreamExecutionEnvironment.execute() 被呼叫後才會把這個 JobGraph 遞交給實體叢集進行運算。這部分詳細的說明已超出本文範圍,會由之後其他文章再詳述。

 

重點 #4:FlatMap 中的 User Function(影片 05:51、10:05 處)


public static void main(String[] args) throws Exception {
    ...

    DataStream<Tuple2<String,Integer>> counts = env
        // read stream of words from socket
        .socketTextStream(localhost, "9999")
        // split up the lines in tuple containing
        .flatMap(new Splitter())
        // group by the tuple field "0"
        .groupBy(0)
        // keep the last 5 minutes of data
        .window(Time.of(5, TimeUnit.Minutes))
        // sum up tuple field "1"
        .sum(1);

    ...
}

第一個轉變為 flatMap,於此處屬於一個 by-each-entry 的轉變,意指會在串流資料源每輸出一筆資料就對這筆資料進行這個 flatMap 運算。相反的,若在執行 flatMap 之前有先對串流資料源定義 window 的轉變,則 flatMap 轉變會對於該 window 內累積的資料進行小批次的轉變運算。Flink 就是以這種方式達到 true-streaming 與 micro-batch streaming 可以並存於同一個運算框架之上。對於這部分會在其他篇有更詳細的介紹。

另一點值得注意的是,Splitter 為一個使用者定義的 User Function,當成 flatMap 的參數。實際上,Splitter 為一個實現 FlatMapFunction 的類別,如下所示:

public class Splitter implements FlatMapFunction<String,Tuple2<String,Integer>> {

    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
        for(String word : s.split("\\w+"))
            collector.collect(new Tuple2(word,1));
    }
}

程式碼本身應該就足以解釋這個 FlatMapFunction 的作用了:對於收到的每一個字串 s,以空白字元切割,並且將每切割出來的字輸出成一個 (word,1) K-V。

FlatMapFunction 需要指定兩個 generic 類別,而這邊指定的為 String 與 Tuple2<String,Integer>,即為 FlatMapFunction<String,Tuple2<String,Integer>>,代表著每一次 flatMap 被執行時,預計接收 String 類別的物件,且輸出為 Tuple2<String,Integer>。Tuple2 類別代表著多個值的有序集合,最多可以到 25 個值(Tuple25),而由於這個範例 flatMap 輸出的為 (word,1),類別組合定義則依序為 <String,Integer>。Tuples 於影片 16:43 處有更詳細的介紹。

可以看到,flatMap 提供了一個 Collector API 介面讓 Flink 應用程式可以在 flatMap 中的任何地方輸出新的值。由於 flatMap 為一個輸入對應多個輸出(無輸出、一個輸出、或者多個輸出), flatMap 中可於多處進行輸出。相對的,若為其他類型的轉變,則依據該類型轉變特性提供的 API 就會不同。影片 16:58 處有另外舉例 MapFunction 與 Filter 的轉變 User Function 類別。

 

重點 #5:以 groupBy 定義資料串流的 Partitioning (影片 23:14 處)


public static void main(String[] args) throws Exception {
    ...

    DataStream&lt;Tuple2&lt;String,Integer&gt;&gt; counts = env
        // read stream of words from socket
        .socketTextStream(localhost, "9999")
        // split up the lines in tuple containing
        .flatMap(new Splitter())
        // group by the tuple field "0"
        .groupBy(0)
        // keep the last 5 minutes of data
        .window(Time.of(5, TimeUnit.Minutes))
        // sum up tuple field "1"
        .sum(1);

    ...
}

由於 Flink 是一個分散式運算框架,為了要達到 scale-out 的彈性,每一個轉變(flatMap、map、filter等)實質上會有多個分散於多個運算節點的執行緒在執行該轉變的運算。這些執行緒被稱為該轉變的 instance,而從一個轉變所輸出的資料筆,若不沒有任何特別指定定義的話,會隨機被分配到下一個轉變的任何一個 instance 進行下一個階段的轉變運算。這樣的運算流程對於大多數運算將會導致錯誤的結果。以這個範例的 streaming wordcount 為例,第一個轉變為將每一行字轉變成多個 (word,1) 的 Tuple2<String,Integer>,接著的下一個轉變為結算每一個字出現的次數。若每一個 (word,1) 是隨機分配到第二個轉變的任何一個 instance,最終算出來的結果勢必會是錯的,我們需要指定同一個 word 的 Tuple2 都同一個 instance 才會正確。這個定義稱為 partitioning,與 MapReduce 的 intermediate key shuffling 非常相似。

Flink 定義 partitioning 的方式非常多樣,上面這一段程式的 groupBy 為最基本的方式,以「tuple 中的第幾個值」當成 partitioning 依據,同一個值的都會被分配到同一個 instance。其他方式包含 Java POJO field partitioning、derivative computation partioning,會在這一系列 Hands-On 影片的 DataStream Hands-On #2 做介紹。

 

重點 #6:整個 Execution Plan 的 Data Sink (影片 29:32 處)


 
public static void main(String[] args) throws Exception { 
    ... 

    // print results in command line
    counts.print()
    // execute program
    env.execute("Socket Incremental Wordcount Example") 
}

在資料串流源接著一系列連續的轉變的最後是定義這個 execution plan 的資料輸出槽(Data Sink),為所有運算結果流向的終點。Flink community 目前已經實做了多種 data sink connector,包含 KafkaElasticsearch、HDFS、S3、純text 檔案或者 text socket 等。這次範例使用的是 print(),簡單的將這段程式(稱為 driver program)執行的 client 端當成輸出槽,將所有運算結果回傳到 client 端的 console output。

 

總結


大致上整理了這個影片的一些重點,希望對於分散式運算框架比較陌生的初學者以及在其他叢集運算有經驗、想快速了解 Flink 的老手有幫助。

整體而言,要撰寫一個 Flink 的 streaming 應用,首先於程式中透過 StreamExecutionEnvironment API 獲得一個代表著實體 Flink 叢集的物件,透過此物件會定義最初的串流資料源(Data Source)、經過一系列使用者自行定義的 User Function 轉變(transformation)、以及最後的資料輸出槽(Data Sink)。這些定義會被 Flink Client 拿來形成一個執行計畫(execution plan),遞交給實體叢集進行實質的分散式運算。

最後附上對應的 Slideshare 投影片:

 

廣告

發表迴響

在下方填入你的資料或按右方圖示以社群網站登入:

WordPress.com Logo

您的留言將使用 WordPress.com 帳號。 登出 / 變更 )

Twitter picture

您的留言將使用 Twitter 帳號。 登出 / 變更 )

Facebook照片

您的留言將使用 Facebook 帳號。 登出 / 變更 )

Google+ photo

您的留言將使用 Google+ 帳號。 登出 / 變更 )

連結到 %s