上篇部落格給大家講解了Flink的入門及dataSource 點選,本篇部落格給講解下Flink的17種常用的運算元,本篇部落格比較長,耐心看完(注意:面試經常被問到,建議收藏,如要對你有幫助的話麻煩,點贊 關注 評論)。Flink專欄
需求: 將 DataSet 中的每一個元素轉換為另外一個元素
範例: 使用 map 操作,將以下資料 「1,張三」, 「2,李四」, 「3,王五」, 「4,趙六」 轉換為一個 scala 的樣例類。
實現步驟:
參考程式碼
import org.apache.flink.api.scala._
/**
* @author 需求: 使用Map將資料轉換成樣例類
* @date 2020/9/8 23:26
* @version 1.0
*/
object BatchMap {
case class User(name:String,age:String)
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用fromElements
val data = env.fromElements("張三,19", "李四,30", "劉恆,299")
//3.使用map將資料封裝成樣例類
val userDataSet = data.map(s => {
User(s.split(",")(0), s.split(",")(1))
})
//4.將資料輸出
userDataSet.print()
}
}
需求: 將 DataSet 中的每一個元素轉換為 0…n 個元素
範例: 分別將以下資料,轉換成 國家 、省份 、城市 三個維度的資料。
將以下資料
張三,中國,江西省,南昌市
李四,中國,河北省,石家莊市
Tom,America,NewYork,Manhattan
轉換為
張三,中國
張三,中國江西省
張三,中國江西省南昌市
解題思路
以上資料為一條轉換為三條,顯然,應當使用 flatMap 來實現 分別在 flatMap 函數中構建三個資料,並放入到一個列表中
顯示結果
姓名, 國家
姓名, 國家省份
姓名, 國家省份城市
實現步驟:
程式碼實現:
import org.apache.flink.api.scala._
/**
* @author 需求:
* 將"張三,中國,江西省,南昌市",
* "李四,中國,河北省,石家莊市",
* "Tom,America,NewYork,Manhattan"
* 轉換為:
* 張三,中國
* 張三,中國江西省
* 張三,中國江西省南昌市
* @date 2020/9/9 0:11
* @version 1.0
*/
object BachFlatMap {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用fromCollection構建資料集
val dataSource = env.fromCollection(List("張三,中國,江西省,南昌市", "李四,中國,河北省,石家莊市", "Tom,America,NewYork,Manhattan"))
val flatMap: DataSet[((String, String), (String, String), (String, String))] = dataSource.flatMap(line => {
val arr = line.split(",")
List(
((arr(0), arr(1)),
(arr(0), arr(1) + arr(2)),
(arr(0), arr(1) + arr(2) + arr(3))))
})
flatMap.print()
}
}
需求: 將一個分割區中的元素轉換為另一個元素
範例: 使用 mapPartition 操作,將以下資料 「1,張三」, 「2,李四」, 「3,王五」, 「4,趙六」 轉換為一個 scala 的樣例類。
實現步驟:
程式碼實現:
import org.apache.flink.api.scala._
/**
* @author 需求:將一天分割區中的資料轉換為一個樣例類
* "1,張三", "2,李四", "3,王五", "4,趙六"
* @date 2020/9/9 21:57
* @version 1.0
*/
object BachMapPartition {
case class User(id:String,name:String)
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用FromElements構建資料集
val dataSource = env.fromElements("1,張三", "2,李四", "3,王五", "4,趙六")
//3.資料處理
val mapPartitionDS: DataSet[User] = dataSource.mapPartition(textPartition => {
textPartition.map( x => {
val arrs = x.split(",")
User(arrs(0), arrs(1))
})
})
//4.結果輸出
mapPartitionDS.print()
}
}
需求: 過濾出來 一些符合條件的元素
Filter作用: Filter 函數在實際生產中特別實用,資料處理階段可以過濾掉大部分不符合業務的內容, 可以極 大減輕整體 flink 的運算壓力
範例: 使用filter過濾掉大於10的數位
實現步驟:
參考程式碼
import org.apache.flink.api.scala._
/**
* @author 需求:使用filter過濾掉大於10的數位
* 過濾出來 一些符合條件的元素 Filter 函數在實際生產中特別實用,資料處理階段可以過濾掉大部分不符合業務的內容,
* 可以極 大減輕整體 flink 的運算壓力
* @date 2020/9/9 22:35
* @version 1.0
*/
object BachFilter {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.構建1-20的資料集
val dataSource = env.generateSequence(1, 20)
//3.處理資料
val filter = dataSource.filter(_ < 10)
//4.結果輸出
filter.print()
}
}
需求: 可以對一個 dataset 或者一個 group 來進行聚合計算,最終聚合成一個元素
範例: 請將以下元組資料,使用 reduce 操作聚合成一個最終結果 (「java」 , 1) , (「java」, 1) ,(「java」 , 1) 將上傳元素資料轉換為 (「java」,3)
實現步驟:
參考程式碼:
import org.apache.flink.api.scala._
/**
* @author 需求:請將以下元組資料,使用 reduce 操作聚合成一個最終結果
* ("java" , 1) , ("java", 1) ,("java" , 1)
* @date 2020/9/9 22:39
* @version 1.0
*/
object BachReduce {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用fromElements 構建資料集
val dataSource = env.fromElements(("java", 1), ("java", 1), ("java", 2))
//3.資料處理(根據key進行分組)
val values = dataSource.groupBy(_._1)
//4.使用reduce進行合併
val reduce = values.reduce((v1, v2) => (v1._1, v2._2 + v1._2))
//4.結果輸出
reduce.print()
}
}
可以對一個 dataset 或者一個 group 來進行聚合計算,最終聚合成一個元素 reduce 和 reduceGroup 的 區別
首先 groupBy 函數會將一個個的單詞進行分組,分組後的資料被 reduce 一個個的拉 取過來,這種方式如果資料量大的情況下,拉取的資料會非常多,增加了網路 IO。
reduceGroup 是 reduce 的一種優化方案; 它會先分組 reduce,然後在做整體的 reduce;這樣做的好處就是可以減少網路 IO。
範例: 請將以下元組資料,下按照單詞使用 groupBy 進行分組,再使用 reduceGroup 操作進行單 詞計數(「java」 , 1) , (「java」, 1) ,(「scala」 , 1)
實現步驟:
參考程式碼
import org.apache.flink.api.scala._
/**
* @author 請將以下元組資料,下按照單詞使用 groupBy 進行分組,再使用 reduceGroup 操作進行單 詞計數("java" , 1) , ("java", 1) ,("scala" , 1)
* @date 2020/9/11 22:15
* @version 1.0
*/
object BachReduceGroup {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.構建資料集
val source = env.fromElements(("java", 1), ("Flin", 1), ("巨量資料", 1), ("java", 2))
//3.使用reduceGroup進行分組求和
val result = source.groupBy(0).reduceGroup(group => (group.reduce((a, b) => (a._1, a._2 + b._2))))
//4.輸出
result.print()
}
}
介紹: 按照內建的方式來進行聚合。例如:SUM/MIN/MAX…
範例: 請將以下元組資料,使用 aggregate 操作進行單詞統計 (「java」, 1), (「巨量資料」, 2), (「巨量資料」, 10)
實現步驟:
參考程式碼
import org.apache.flink.api.scala._
/**
* @author 請將以下元組資料,使用 aggregate 操作進行單詞統計 ("java", 1), ("巨量資料", 2), ("巨量資料", 10)
* @date 2020/9/11 22:30
* @version 1.0
*/
object BachAggregate {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用fromElements構建資料集
val sourceData = env.fromElements(("java", 1), ("巨量資料", 2), ("巨量資料", 10))
//3.使用groupBy進行分組然後使用aggregate求出最大值
val result = sourceData.groupBy(0).aggregate(Aggregations.MAX, 1)
//4.結果輸出
result.print()
}
}
介紹: 獲取指定欄位的最大值、最小值
範例: 請將以下元組資料,使用 aggregate 操作進行單詞統計 (1, 「yuwen」, 89.0) , (2, 「shuxue」, 92.2),(3, 「yingyu」, 89.99),(4, 「wuli」, 98.9), (1, 「yuwen」, 88.88),(1, 「wuli」, 93.00),(1, 「yuwen」, 94.3)
實現步驟:
參考程式碼:
import org.apache.flink.api.scala._
import scala.collection.mutable
import scala.util.Random
/**
* @author
* @date 2020/9/11 22:40
* @version 1.0
*/
object BachMinByAndMaxBy {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.構建資料集
val data = new mutable.MutableList[(Int, String, Double)]
data += ((1, "liuheng", 89.0))
data += ((2, "shuxue", 92.2))
data += ((3, "yingyu", 89.99))
data += ((4, "wuli", 98.9))
data += ((1, "yuwen", 88.88))
data += ((1, "wuli", 93.00))
data += ((1, "yuwen", 94.3))
val sourceData = env.fromCollection(Random.shuffle(data))
//3.使用MinBy求出最小值與MaxBy求出最大值
val min = sourceData.groupBy(1).minBy(2)
val max = sourceData.groupBy(1).maxBy(2)
//4.輸出最小值
min.print()
println("-----------------------------")
//5.輸出最大值
max.print()
}
}
介紹: 去除重複的資料
範例: 請將以下元組資料,使用 distinct 操作去除重複的單詞 (「java」 , 1) , (「java」, 1) ,(「scala」 , 1) 去重得到 (「java」, 1), (「scala」, 1)
實現步驟:
參考程式碼:
import org.apache.flink.api.scala._
/**
* @author 需求:使用distinct求("java", 1), ("java", 2), ("scala", 1) 去掉重複的資料
* @date 2020/9/12 22:56
* @version 1.0
*/
object BachDistinct {
def main(args: Array[String]): Unit = {
//1.構建執行環境(上下文物件)
val env = ExecutionEnvironment.getExecutionEnvironment
//2.使用fromElements
val dataSource = env.fromElements(("java", 1), ("java", 2), ("scala", 1))
//3.使用distinct去掉重複的
val distinct = dataSource.distinct(0)
//4.結果輸出
distinct.print()
}
}
介紹: 使用 join 可以將兩個 DataSet 連線起來
範例: 有兩個 csv 檔案,有一個為 score.csv ,一個為 subject.csv ,分 別儲存了成績資料以及學科資料
sorce.csv
1,語數
2,英物
3,化生
4,文學
5,語理
6,學物
subject.csv
1,張三,1,98
2,張三,2,77.5
3,張三,3,89
4,張三,4,65
5,張三,5,78
6,張三,6,70
9,李四,3,65
10,李四,4,78
11,李四,5,70
12,李四,6,78
13,王五,1,70
14,王五,2,78
實現步驟:
參開程式碼:
import org.apache.flink.api.scala._
/**
* @author 需求:使用join的方式將sorce.csv檔案與subject.csv檔案進行關聯
* @date 2020/9/12 23:38
* @version 1.0
*/
object BachJoin {
//構建樣例類
case class sorce(id:String,subject:String)
case class subject(id:String,name:String,sid:String,source:String)
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.構建資料集(使用檔案方式)
val sorce = env.readCsvFile[sorce]("./data/score.csv")
val subject = env.readCsvFile[subject]("./data/subject.csv")
//3.使用join將兩個檔案中的資料進行關聯
val joinData = sorce.join(subject).where(_.id).equalTo(_.sid)
//4.結果輸出
joinData.print()
}
}
介紹: 左外連線,左邊的 Dataset 中的每一個元素,去連線右邊的元素
範例: 請將以下元組資料
(使用者 id,使用者姓名)
(1, 「zhangsan」) ,
(2, 「lisi」) ,
(3 , 「wangwu」) ,
(4 , 「zhaoliu」)
元組資料
(使用者 id,所在城市)
(1, 「beijing」),
(2, 「shanghai」),
(4, 「guangzhou」)
返回如下資料:
(3,wangwu,null)
(1,zhangsan,beijing)
(2,lisi,shanghai)
(4,zhaoliu,guangzhou)
參考程式碼
import scala.collection.mutable.ListBuffer
/**
* @author 需求:使用左連線 請將以下元組資料(使用者 id,使用者姓名)
* (1, "zhangsan") ,
* (2, "lisi") ,
* (3 , "wangwu") ,
* (4 , "zhaoliu")
* 元組資料
* (使用者 id,所在城市)
* (1, "beijing"),
* (2, "shanghai"),
* (4, "guangzhou")
* 返回如下資料:
* (3,wangwu,null)
* (1,zhangsan,beijing)
* (2,lisi,shanghai)
* (4,zhaoliu,guangzhou)
* @date 2020/9/15 23:30
* @version 1.0
*/
object BachLeftOuterJoin {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.建立要測試的資料集
val data1 = ListBuffer[Tuple2[Int, String]]()
data1.append((1, "zhangsan"))
data1.append((2, "lisi"))
data1.append((3, "wangwu"))
data1.append((4, "zhaoliu"))
val data2 = ListBuffer[Tuple2[Int, String]]()
data2.append((1, "beijing"))
data2.append((2, "shanghai"))
data2.append((4, "guangzhou"))
//2.2 使用fromCollection構建資料集
val test1 = env.fromCollection(data1)
val test2 = env.fromCollection(data2)
//3.使用leftOuterJoin 進行關聯
val result = test1.leftOuterJoin(test2).where(0).equalTo(0).apply((first, second)=>{
if (second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
})
//4.結果輸出
result.print()
}
}
範例: 右外連線,左邊的 Dataset 中的每一個元素,去連線左邊的元素
範例: 請將以下元組資料
(使用者 id,使用者姓名)
(1, 「zhangsan」) ,
(2, 「lisi」) ,
(3 , 「wangwu」) ,
(4 , 「zhaoliu」)
元組資料
(使用者 id,所在城市)
(1, 「beijing」),
(2, 「shanghai」),
(4, 「guangzhou」)
返回如下資料:
(1,zhangsan,beijing)
(2,lisi,shanghai)
(4,zhaoliu,guangzhou)
參考程式碼
import org.apache.flink.api.scala._
import scala.collection.mutable.ListBuffer
/**
* @author 需求:使用左連線 請將以下元組資料(使用者 id,使用者姓名)
* (1, "zhangsan") ,
* (2, "lisi") ,
* (3 , "wangwu") ,
* (4 , "zhaoliu")
* 元組資料
* (使用者 id,所在城市)
* (1, "beijing"),
* (2, "shanghai"),
* (4, "guangzhou")
* 返回如下資料:
* (1,zhangsan,beijing)
* (4,zhaoliu,guangzhou)
* (2,lisi,shanghai)
* @date 2020/9/15 23:30
* @version 1.0
*/
object BachRightOuterJoin {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.建立要測試的資料集
val data1 = ListBuffer[Tuple2[Int, String]]()
data1.append((1, "zhangsan"))
data1.append((2, "lisi"))
data1.append((3, "wangwu"))
data1.append((4, "zhaoliu"))
val data2 = ListBuffer[Tuple2[Int, String]]()
data2.append((1, "beijing"))
data2.append((2, "shanghai"))
data2.append((4, "guangzhou"))
//2.2 使用fromCollection構建資料集
val test1 = env.fromCollection(data1)
val test2 = env.fromCollection(data2)
//3.使用rightOuterJoin 進行關聯
val result = test1.rightOuterJoin(test2).where(0).equalTo(0).apply((first, second)=>{
if (second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
})
//4.結果輸出
result.print()
}
}
介紹: 全外連線,左右兩邊的元素,全部連線
範例: 請將以下元組資料
(使用者 id,使用者姓名)
(1, 「zhangsan」) ,
(2, 「lisi」) ,
(3 , 「wangwu」) ,
(4 , 「zhaoliu」)
元組資料
(使用者 id,所在城市)
(1, 「beijing」),
(2, 「shanghai」),
(4, 「guangzhou」)
返回如下資料:
(3,wangwu,null)
(1,zhangsan,beijing)
(2,lisi,shanghai)
(4,zhaoliu,guangzhou)
擴充套件:
參考程式碼:
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.scala._
import scala.collection.mutable.ListBuffer
/**
* @author 需求:
* 請將以下元組資料(使用者 id,使用者姓名)
* (1, "zhangsan") ,
* (2, "lisi") ,
* (3 , "wangwu") ,
* (4 , "zhaoliu")
* 元組資料(使用者 id,所在城市)
* (1, "beijing"),
* (2, "shanghai"),
* (4, "guangzhou")
* 返回如下資料:
* (3,wangwu,null)
* (1,zhangsan,beijing)
* (2,lisi,shanghai)
* (4,zhaoliu,guangzhou)
* @date 2020/9/15 23:43
* @version 1.0
*/
object BachFullOuterJoin {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.建立要測試的資料集
val data1 = ListBuffer[Tuple2[Int, String]]()
data1.append((1, "zhangsan"))
data1.append((2, "lisi"))
data1.append((3, "wangwu"))
data1.append((4, "zhaoliu"))
val data2 = ListBuffer[Tuple2[Int, String]]()
data2.append((1, "beijing"))
data2.append((2, "shanghai"))
data2.append((4, "guangzhou"))
//2.2 使用fromCollection構建資料集
val test1 = env.fromCollection(data1)
val test2 = env.fromCollection(data2)
//3.使用fullOuterJoin 進行關聯
val result = test1.fullOuterJoin(test2,JoinHint.REPARTITION_SORT_MERGE).where(0).equalTo(0).apply((first, second)=>{
if (first==null){
(second._1,"null",second._2)
} else if (second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
})
//4.結果輸出
result.print()
}
}
介紹: 和 join 類似,但是這種交叉操作會產生笛卡爾積,在資料比較大的時候,是非常消耗記憶體 的操作;
範例: 請將以下元組資料 (1, 4, 7), (2, 5, 8), (3, 6, 9)
元組資料 (10, 40, 70), (20, 50, 80), (30, 60, 90)
進行笛卡爾積,返回如下資料:
Buffer(((1,4,7),(10,40,70)), ((1,4,7),(20,50,80)), ((1,4,7),(30,60,90)), ((2,5,8),(10,40,70)), ((2,5,8),(20,50,80)), ((2,5,8),(30,60,90)), ((3,6,9),(10,40,70)), ((3,6,9),(20,50,80)), ((3,6,9),(30,60,90)))
參考程式碼:
import org.apache.flink.api.scala._
/**
* @author
* @date 2020/9/15 23:50
* @version 1.0
*/
object BatchCross {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
println("==============================cross=======================================")
cross(env)
println("==============================crossWithTiny=======================================")
crossWithTiny(env)
println("==============================crossWithHuge=======================================")
crossWithHuge(env)
}
/**
* 笛卡爾集
*
* @param env
*/
def cross(env: ExecutionEnvironment) = {
//1.使用 fromElements定義兩個dataSet
val data1 = env.fromElements((1, 4, 7), (2, 5, 8), (3, 6, 9))
val data2 = env.fromElements((10, 40, 70), (20, 50, 80), (30, 60, 90))
val result = data1.cross(data2)
println(result.collect())
}
/**
* 暗示第二個輸入較小的交叉
*
* @param env
*/
def crossWithTiny(env: ExecutionEnvironment) = {
//1.定義 case class
case class Coord(id: Int, x: Int, y: Int)
val data1: DataSet[Coord] = env.fromElements( Coord(2, 5, 8), Coord(1, 4, 7),Coord(3, 6, 9))
val data2: DataSet[Coord] = env.fromElements( Coord(20, 50, 80),Coord(10, 40, 70), Coord(30, 60, 90))
val result = data1.crossWithTiny(data2)
result.print()
}
def crossWithHuge(env: ExecutionEnvironment) = {
//1.定義 case class
case class Coord(id: Int, x: Int, y: Int)
val data1: DataSet[Coord] = env.fromElements(Coord(1, 4, 7), Coord(2, 5, 8), Coord(3, 6, 9))
val data2: DataSet[Coord] = env.fromElements(Coord(10, 40, 70), Coord(20, 50, 80), Coord(30, 60, 90))
val result = data1.crossWithHuge(data2)
result.print()
}
}
介紹: 將多個 DataSet 合併成一個 DataSet【注意】:union 合併的 DataSet 的型別必須是一致 的
範例:
將以下資料進行取並集操作
資料集
1 「hadoop」, 「hive」, 「flume」
資料集 2
「hadoop」, 「hive」, 「spark」
實現步驟:
參考程式碼:
import org.apache.flink.api.scala._
/**
* @author 需求:
* 將以下資料進行取並集操作
* 資料集
* 1 "hadoop", "hive", "flume"
* 資料集
* 2 "hadoop", "hive", "spark
* @date 2020/9/16 0:05
* @version 1.0
*/
object BachUnion {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.構建資料集
val data1 = env.fromCollection(List("hadoop", "hive", "flume"))
val data2 = env.fromCollection(List("hadoop", "hive", "spark"))
val result = data1.union(data2)
result.print()
}
}
介紹:
Flink 也有資料傾斜的時候,比如當前有資料量大概 10 億條資料需要處理,在處理過程中 可能會 發生如圖所示的狀況:
這個時候本來總體資料量只需要 10 分鐘解決的問題,出現了資料傾斜,機器 1 上的 任務需 要 4 個小時才能完成,那麼其他 3 臺機器執行完畢也要等待機器 1 執行完畢後才 算整體將任務完成;所以在實際的工作中,出現這種情況比較好的解決方案就是—rebalance(內 部使用 round robin 方法將資料均勻打散。這對於資料傾斜時是很 好的選擇。)
實現步驟:
程式碼實現:
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
/**
* @author 對資料集進行再平衡,重分割區,消除資料傾斜
* @date 2020/9/16 0:21
* @version 1.0
*/
object BatchRebalance {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.構建資料集
val data = env.generateSequence(0, 100)
val ds = data.filter(_ > 8)
//3.對資料進行在平衡操作
val value1 = ds.rebalance().map(new RichMapFunction[Long, (Int, Long)] {
override def map(value: Long): (Int, Long) = {
(getRuntimeContext.getIndexOfThisSubtask, value)
}
})
//4.結果輸出
value1.print()
}
}
介紹: 根據給定的 key 對一個資料集取前 N 條資料(往往在公司中是經常用到了,比如頭條中的熱搜Top10)
實現步驟:
參考程式碼:
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala._
import scala.collection.mutable.ListBuffer
/**
* @author 需求:根據給定的 key 對一個資料集取前 N 條資料
* @date 2020/9/16 19:07
* @version 1.0
*/
object BachFirst {
def main(args: Array[String]): Unit = {
//1.構建執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.構建資料集
val data = ListBuffer[Tuple2[Int,String]]()
data.append((2,"zs"))
data.append((4,"ls"))
data.append((3,"ww"))
data.append((1,"xw"))
data.append((1,"aw"))
data.append((1,"mw"))
val text = env.fromCollection(data)
//3.使用first去前三條資料
val first = text.first(3)
val sortFirst = text.sortPartition(0, Order.ASCENDING).sortPartition(1, Order.DESCENDING).first(3)
//4.結果資料
first.print()
sortFirst.print()
}
}