所有的語言開篇都是Hello Word,數(shù)據(jù)處理引擎也有Hello Word。那就是Word Count。MR,Spark,F(xiàn)link以來開篇第一個程序都是Word Count。那么今天Flink開始目標就是在本地調(diào)試出Word Count。

單機安裝Flink
開始Flink之前先在本機嘗試安裝一下Flink,當然FLink正常情況下是部署的集群方式。作者比較窮,機器配置太低開不了幾個虛擬機。所以只能先演示個單機的安裝。
Apache Flink需要在Java1.8+以上的環(huán)境中運行 。
所以,先確保自己的JDK版本是1.8包含以上的。
Flink單機部署非常簡單,只需安裝下載安裝即可。如果需要與Hadoop版本結合,那么下載相應的Hadoop關聯(lián)版本即可。如果不與Hadoop結合就直接下載Scala版即可。我這里就直接下載了Scala2.11的相關版本。

點擊進入Apache頁面進行下載,大小約有283MB。
把下載下來的壓縮包進行解壓即可。

打開命令行直接執(zhí)行
/bin/start-cluster.bat
進行啟動。

瀏覽器打開 http://localhost:8081

至此在Windows10環(huán)境下即完成Flink的啟動。
編寫WordCount
因為Flink是由Scala進行開發(fā)的,而Scala是基于JVM的一種語言。所以最終也會轉(zhuǎn)換為JAVA字節(jié)碼文件,所以Flink程序可以由Java、Scala兩種語言都可以進行開發(fā)。也可以同時開發(fā)。比如Java寫一部分代碼,Scala寫另一部分代碼。可以參考Apache Flink利用Maven對Scala與Java進行混編>。
Flink官方提供快速生成工程的兩種工具:SBT與Maven。由于作者比較熟悉Maven,( 或者說沒用過SBT )。所以直接使用Maven快速創(chuàng)建一個工程。
Java版本
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.8.0
Scala版本
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.8.0
按照提示輸入相關信息,即可生成最終的項目。
├── pom.xml
└── src
└── main
├── resources
│ └── log4j.properties
└── scala/java
└── org
└── myorg
└── quickstart
├── BatchJob.scala
└── StreamingJob.scala
把工程導入到IDEA中
如果使用Scala的話,那么需要安裝Scala的插件。搜索安裝同時需要把Scala語言包進行安裝。
不知道如何操作可以聯(lián)系我 微信公號指尖數(shù)蟲>。
package jar;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchJob {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//讀取目錄下的文件
DataSourceString> data = env.readTextFile("/opt/Server_Packets/log/ServerLog_1_runtime.log");
//把文件中的內(nèi)容按照空格進行拆分為 word,1 1 是為了能夠在下面進行計算.
data.flatMap(new FlatMapFunctionString, Tuple2String, Integer>>() {
@Override
public void flatMap(String s, CollectorTuple2String, Integer>> collector) throws Exception {
for (String word : s.split(" ")){
collector.collect(new Tuple2>(word,1));
}
}
})
// 按照元組中的第1位進行分組
.groupBy(0)
// 分組的元組的計算方式為 value +value 也就是剛才的 同樣的詞 把 1+1
.reduce(new ReduceFunctionTuple2String, Integer>>() {
@Override
public Tuple2String, Integer> reduce(Tuple2String, Integer> t2, Tuple2String, Integer> t1) throws Exception {
return new Tuple2>(t1.f0,t1.f1+ t2.f1);
}
})
//輸出結果
.print();
}
}
總結
以上所述是小編給大家介紹的大數(shù)據(jù)HelloWorld-Flink實現(xiàn)WordCount,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
如果你覺得本文對你有幫助,歡迎轉(zhuǎn)載,煩請注明出處,謝謝!
您可能感興趣的文章:- 淺談實時計算框架Flink集群搭建與運行機制
- 詳解大數(shù)據(jù)處理引擎Flink內(nèi)存管理
- Apache FlinkCEP 實現(xiàn)超時狀態(tài)監(jiān)控的步驟詳解
- Flink支持哪些數(shù)據(jù)類型?
- Java lambda表達式實現(xiàn)Flink WordCount過程解析
- 解析Flink內(nèi)核原理與實現(xiàn)核心抽象