What Apache Flink
Apache Flink 是一個==分布式大數據處理引擎==,可對==有限數據流和無限數據流==進行==有狀態計算==。可部署在==各種集群環境==,對各種大小的數據規模進行快速計算。
分布式大數據處理引擎
-
是一個分布式的、高可用的用于大數據處理的計算引擎
有限流和無限流
- 有限流:有始有終的數據流。即傳統意義上的批數據,進行批處理
-
無限流:有始無終的數據流。即現實生活中的流數據,進行流處理
有狀態計算
-
良好的狀態機制,進行較好的容錯處理和任務恢復。同時實現 Exactly-Once 語義。
各種集群環境
-
可部署standalone、Flink on yarn、Flink on Mesos、Flink on k8s等等
Flink Application
Streams
數據在真實世界中是不停產生不停發出的,所以數據處理也應該還原真實,做到真正的流處理。而批處理則是流處理的特殊情況
- 即上面說的有限流和無限流,貼官網圖說明。
State
在流計算場景中,其實所有流計算本質上都是增量計算(Incremental Processing)。
例如,計算前幾個小時或者一直以來的某個指標(PV、UV等),計算完一條數據之后需要保存其計算結果即狀態,以便和下一條計算結果合并。
另外,保留計算狀態,進行 CheckPoint 可以很好地實現流計算的容錯和任務恢復,也可以實現Exactly Once處理語義
Time
三類時間:
- Event Time:事件真實產生的時間
- Processing Time:事件被 Flink 程序處理的時間
- Ingestion Time:事件進入到 Flink 程序的時間
API
API分三層,越接近SQL層,越抽象,靈活性越低,但更簡單易用。
- SQL/Table層:直接使用SQL進行數據處理
- DataStream/DataSet API:最核心的API,對流數據進行處理,可在其上實現自定義的WaterMark、Windows、State等操作
- ProcessFunction:也叫RunTime層,最底層的API,帶狀態的事件驅動。
Flink Architecture
Data Pipeline Applications
即 real-time Stream ETL:流式ETL拆分。
通常,ETL都是通過定時任務調度SQL文件或者MR任務來執行的。在實時ETL場景中,將批量ETL邏輯寫到流處理中,分散計算壓力和提高計算結果的實時性。
多用于實時數倉、實時搜索引擎等
Data Analytics Applications
即數據分析,包括流式數據分析和批量數據分析。例如實時報表、實時大屏。
Event-driven Applications
即事件驅動應用,在一個有狀態的計算過程中,通常情況下都是將狀態保存在第三方系統(如Hbase Redis等)中。
而在Flink中,狀態是保存在內部程序中,減少了狀態存取的不必要的I/O開銷,更大吞吐量和更低延時。
第一個 Flink 程序
開發環境要求
主要是Java環境和Maven環境。Java要求JDK1.8,Maven要求3.0以上,開發工具推薦使用 ItelliJ IDEA,社區說法:Eclipse在Java和Scala混合編程下有問題,故不推薦。
代碼示例:
package source.streamDataSource; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class SocketWindowWordCount { public static void main(String[] args) throws Exception{ if(args.length!=2){ System.err.println("Usage:nSocketWindowWordCount hostname port"); } // 獲取程序參數 String hostname = args[0]; int port = Integer.parseInt(args[1]); // 入口類,用于設置環境和參數等 StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); // 設置 Time 類型 see.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 從指定 IP 端口 讀取流數據,返回一個 DataStreamSource DataStreamSource<String> text = see.socketTextStream(hostname, port, "n", 5); // 在 DataStreamSource 上做操作即 transformation DataStream<Tuple2<String, Integer>> windowCount = text // flatMap , FlatMap接口的實現:將獲取到的數據分割,并每個元素組合成 (word, count)形式 .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception { for (String word : value.split("\s")) { collector.collect(Tuple2.of(word, 1)); } } }) // 按位置指定key,進行聚合操作 .keyBy(0) // 指定窗口大小 .timeWindow(Time.seconds(5)) // 在每個key上做sum // reduce 和 sum 的實現 // .reduce(new ReduceFunction<Tuple2<String, Integer>>() { // @Override // public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception { // return Tuple2.of(stringIntegerTuple2.f0, stringIntegerTuple2.f1+t1.f1); // } // }); .sum(1); // 一個線程執行 windowCount.print().setParallelism(1); see.execute("Socket Window WordCount"); // 其他 transformation 操作示例 // windowCount // .map(new MapFunction<Tuple2<String,Integer>, String>() { // @Override // public String map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { // return stringIntegerTuple2.f0; // } // }) // .print(); // // text.filter(new FilterFunction<String>() { // @Override // public boolean filter(String s) throws Exception { // return s.contains("h"); // } // }) // .print(); // // SplitStream<String> split = text.split(new OutputSelector<String>() { // @Override // public Iterable<String> select(String value) { // ArrayList<String> strings = new ArrayList<>(); // if (value.contains("h")) // strings.add("Hadoop"); // else // strings.add("noHadoop"); // return strings; // // } // }); // // split.select("hadoop").print(); // split.select("noHadoop").map(new MapFunction<String, String>() { // @Override // public String map(String s) throws Exception { // // return s.toUpperCase(); // } // }).print(); } }