- 准备工作
-
1.map类
- 1.1 java stream map
-
1.2 spark map
- 1.2.1 MapFunction
- 1.2.2 MapPartitionsFunction
- 2.1 java stream flatMap
- 3.1 java stream groupBy
- 4.1 java stream reduce
俗话说男追女隔座山,女追男隔层纱。
如果说零基础学大数据,感觉前面是一座山,那么只要你会java或者任意一门语言的stream流,那大数据就只隔了一层纱。
本文以java stream流计算为例,讲解一些基础的spark操作。另一个流行的大数据框架flink同理。
准备工作
张三,20,研发部,普通员工
李四,31,研发部,普通员工
李丽,36,财务部,普通员工
张伟,38,研发部,经理
杜航,25,人事部,普通员工
周歌,28,研发部,普通员工
创建一个Employee
类。
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
static
class Employee implements Serializable {
private String name;
private Integer age;
private String department;
private String level;
}
}
版本:
jdk:1.8
spark:3.2.0
scala:2.12.15
上面的scala版本只是spark框架本身需要依赖到scala。
因为scala确实是比较小众的语言,本文还是使用java演示spark代码。
1.map类
1.1 java stream map
这种思想,在java和spark,flink都是一致的。
Employee对象。
List<String> list = FileUtils.readLines(new File("f:/test.txt", "utf-8";
List<Employee> employeeList = list.stream(.map(word -> {
List<String> words = Arrays.stream(word.split(",".collect(Collectors.toList(;
Employee employee = new Employee(words.get(0, Integer.parseInt(words.get(1, words.get(2, words.get(3;
return employee;
}.collect(Collectors.toList(;
employeeList.forEach(System.out::println;
转换后的数据:
JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通员工
JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通员工
JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工
JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=经理
JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工
JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工
1.2 spark map
首先得到一个SparkSession对象,读取文件,得到一个DataSet弹性数据集对象。
SparkSession session = SparkSession.builder(.master("local[*]".getOrCreate(;
Dataset<Row> reader = session.read(.text("F:/test.txt";
reader.show(;
这里的show(就是打印输出当前数据集,它是一个action类的算子。
得到结果:
+-----------------------+
| value|
+-----------------------+
|张三,20,研发部,普通员工|
|李四,31,研发部,普通员工|
|李丽,36,财务部,普通员工|
| 张伟,38,研发部,经理|
|杜航,25,人事部,普通员工|
|周歌,28,研发部,普通员工|
+-----------------------+
现在我们拿到了基础数据,我们使用map一对一操作,将一行行数据转换为Employee
对象。
我们这里不使用lamda表达式,让大家看得更加清晰。
这里实现了MapFunction接口里的call方法,每次拿到一行数据,我们这里进行切分,再转换为对象。
-
我们今天会演示的算子都是Transformation类的算子。
典型的Action算子包括show,collect,save之类的。比如在本地进行show查看结果,或者完成运行后save到数据库,或者HDFS。 -
只需要注意driver端会将代码分发到各个分布式系统的节点executor上,它本身不会参与计算。一般来说,算子外部,如以下示例代码的a处会在driver端执行,b处算子内部会不同服务器上的executor端执行。
所以在算子外部定义的变量,在算子内部使用的时候要特别注意! 不要想当然地以为都是一个main方法里写的代码,就一定会在同一个JVM里。
这里涉及到序列化的问题,同时它们分处不同的JVM,使用"=="比较的时候也可能会出问题!
这是一个后端WEB开发转向大数据开发时,这个思想一定要转变过来。简言之,后端WEB服务的分布式是我们自己实现的,大数据的分布式是框架天生帮我们实现的
。
因为你不知道上游数据清洗到底怎么样,很可能拿到一条脏数据,处理的时候抛出异常,如果没有捕获处理,那么整个应用就会挂掉。
1.2.1 MapFunction
// a 算子外部,driver端
Dataset<Employee> employeeDataset = reader.map(new MapFunction<Row, Employee>( {
@Override
public Employee call(Row row throws Exception {
// b 算子内部,executor端
Employee employee = null;
try {
// gson.fromJson(; 这里使用gson涉及到序列化问题
List<String> list = Arrays.stream(row.mkString(.split(",".collect(Collectors.toList(;
employee = new Employee(list.get(0, Integer.parseInt(list.get(1, list.get(2, list.get(3;
} catch (Exception exception {
// 日志记录
// 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致任务退出,所以这里要做好异常的抓取
exception.printStackTrace(;
}
return employee;
}
}, Encoders.bean(Employee.class;
employeeDataset.show(;
输出
+---+----------+--------+----+
|age|department| level|name|
+---+----------+--------+----+
| 20| 研发部|普通员工|张三|
| 31| 研发部|普通员工|李四|
| 36| 财务部|普通员工|李丽|
| 38| 研发部| 经理|张伟|
| 25| 人事部|普通员工|杜航|
| 28| 研发部|普通员工|周歌|
1.2.2 MapPartitionsFunction
spark中 map和mapPartitions有啥区别?
map是1条1条处理数据
mapPartitions是一个分区一个分区处理数据
不一定,看具体情况。
得到一批数据,然后再一对一映射为对象,再以Iterator的形式返回这批数据。
Dataset<Employee> employeeDataset2 = reader.mapPartitions(new MapPartitionsFunction<Row, Employee>( {
@Override
public Iterator<Employee> call(Iterator<Row> iterator throws Exception {
List<Employee> employeeList = new ArrayList<>(;
while (iterator.hasNext({
Row row = iterator.next(;
try {
List<String> list = Arrays.stream(row.mkString(.split(",".collect(Collectors.toList(;
Employee employee = new Employee(list.get(0, Integer.parseInt(list.get(1, list.get(2, list.get(3;
employeeList.add(employee;
} catch (Exception exception {
// 日志记录
// 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致任务退出,所以这里要做好异常的抓取
exception.printStackTrace(;
}
}
return employeeList.iterator(;
}
}, Encoders.bean(Employee.class;
employeeDataset2.show(;
输出结果跟map一样,这里就不贴出来了。
2.flatMap类
map是一对一,flatMap是一对多。
当然在java stream中,flatMap叫法叫做扁平化。
2.1 java stream flatMap
以下代码将1条原始数据映射到2个对象上并返回。
List<Employee> employeeList2 = list.stream(.flatMap(word -> {
List<String> words = Arrays.stream(word.split(",".collect(Collectors.toList(;
List<Employee> lists = new ArrayList<>(;
Employee employee = new Employee(words.get(0, Integer.parseInt(words.get(1, words.get(2, words.get(3;
lists.add(employee;
Employee employee2 = new Employee(words.get(0+"_2", Integer.parseInt(words.get(1, words.get(2, words.get(3;
lists.add(employee2;
return lists.stream(;
}.collect(Collectors.toList(;
employeeList2.forEach(System.out::println;
输出
JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通员工
JavaStreamDemo.Employee(name=张三_2, age=20, department=研发部, level=普通员工
JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通员工
JavaStreamDemo.Employee(name=李四_2, age=31, department=研发部, level=普通员工
JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工
JavaStreamDemo.Employee(name=李丽_2, age=36, department=财务部, level=普通员工
JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=经理
JavaStreamDemo.Employee(name=张伟_2, age=38, department=研发部, level=经理
JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工
JavaStreamDemo.Employee(name=杜航_2, age=25, department=人事部, level=普通员工
JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工
JavaStreamDemo.Employee(name=周歌_2, age=28, department=研发部, level=普通员工
2.2 spark flatMap
这里实现FlatMapFunction的call方法,一次拿到1条数据,然后返回值是Iterator,所以可以返回多条。
Dataset<Employee> employeeDatasetFlatmap = reader.flatMap(new FlatMapFunction<Row, Employee>( {
@Override
public Iterator<Employee> call(Row row throws Exception {
List<Employee> employeeList = new ArrayList<>(;
try {
List<String> list = Arrays.stream(row.mkString(.split(",".collect(Collectors.toList(;
Employee employee = new Employee(list.get(0, Integer.parseInt(list.get(1, list.get(2, list.get(3;
employeeList.add(employee;
Employee employee2 = new Employee(list.get(0+"_2", Integer.parseInt(list.get(1, list.get(2, list.get(3;
employeeList.add(employee2;
} catch (Exception exception {
exception.printStackTrace(;
}
return employeeList.iterator(;
}
}, Encoders.bean(Employee.class;
employeeDatasetFlatmap.show(;
输出
+---+----------+--------+------+
|age|department| level| name|
+---+----------+--------+------+
| 20| 研发部|普通员工| 张三|
| 20| 研发部|普通员工|张三_2|
| 31| 研发部|普通员工| 李四|
| 31| 研发部|普通员工|李四_2|
| 36| 财务部|普通员工| 李丽|
| 36| 财务部|普通员工|李丽_2|
| 38| 研发部| 经理| 张伟|
| 38| 研发部| 经理|张伟_2|
| 25| 人事部|普通员工| 杜航|
| 25| 人事部|普通员工|杜航_2|
| 28| 研发部|普通员工| 周歌|
| 28| 研发部|普通员工|周歌_2|
+---+----------+--------+------+
3 groupby类
与SQL类似,java stream流和spark一样,groupby对数据集进行分组并在此基础上可以进行聚合函数操作。也可以分组直接得到一组子数据集。
3.1 java stream groupBy
Map<String, Long> map = employeeList.stream(.collect(Collectors.groupingBy(Employee::getDepartment, Collectors.counting(;
System.out.println(map;
输出
{财务部=1, 人事部=1, 研发部=4}
3.2 spark groupBy
将映射为对象的数据集按部门分组,在此基础上统计部门员工数和平均年龄。
RelationalGroupedDataset datasetGroupBy = employeeDataset.groupBy("department";
// 统计每个部门有多少员工
datasetGroupBy.count(.show(;
/**
* 每个部门的平均年龄
*/
datasetGroupBy.avg("age".withColumnRenamed("avg(age","avgAge".show(;
输出分别为
+----------+-----+
|department|count|
+----------+-----+
| 财务部| 1|
| 人事部| 1|
| 研发部| 4|
+----------+-----+
+----------+------+
|department|avgAge|
+----------+------+
| 财务部| 36.0|
| 人事部| 25.0|
| 研发部| 29.25|
+----------+------+
3.3 spark groupByKey
spark的groupBy
和groupByKey
的区别,前者在此基础上使用聚合函数得到一个聚合值,后者只是进行分组,不进行任何计算。
类似于java stream的:
Map<String, List<Employee>> map2 = employeeList.stream(.collect(Collectors.groupingBy(Employee::getDepartment;
System.out.println(map2;
输出
{财务部=[JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工],
人事部=[JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工],
研发部=[
JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通员工,
JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通员工,
JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=经理,
JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工]}
使用spark groupByKey。
先得到一个key-value的一对多的一个集合数据集。
这里的call(方法返回的是key,即分组的key。
KeyValueGroupedDataset keyValueGroupedDataset = employeeDataset.groupByKey(new MapFunction<Employee, String>( {
@Override
public String call(Employee employee throws Exception {
// 返回分组的key,这里表示根据部门进行分组
return employee.getDepartment(;
}
}, Encoders.STRING(;
再在keyValueGroupedDataset
的基础上进行mapGroups,在call(方法里就可以拿到每个key的所有原始数据。
keyValueGroupedDataset.mapGroups(new MapGroupsFunction( {
@Override
public Object call(Object key, Iterator iterator throws Exception {
System.out.println("key = " + key;
while (iterator.hasNext({
System.out.println(iterator.next(;
}
return iterator;
}
}, Encoders.bean(Iterator.class
.show(; // 这里的show(没有意义,只是触发计算而已
输出
key = 人事部
SparkDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工
key = 研发部
SparkDemo.Employee(name=张三, age=20, department=研发部, level=普通员工
SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工
SparkDemo.Employee(name=张伟, age=38, department=研发部, level=经理
SparkDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工
key = 财务部
SparkDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工
4 reduce类
reduce
的字面意思是:减少;减小;降低;缩小。
又叫归约。当前对象和
前一对象
两两进行计算,每次计算得到的结果作为下一次
计算的前一对象
,并最终得到一个对象。
假设有5个数据【1,2,3,4,5】,使用reduce进行计算,分别是4.1 java stream reduce
int age = employeeList.stream(.mapToInt(e -> e.age.sum(; System.out.println(age;//178
使用reduce也可进行上面的计算
int age1 = employeeList.stream(.mapToInt(e -> e.getAge(.reduce(0,(a,b -> a+b; System.out.println(age1;// 178
但是我将年龄求和,同时得到一个完整的对象呢?
JavaStreamDemo.Employee(name=周歌, age=178, department=研发部, level=普通员工
可以使用reduce将数据集两两循环,将年龄相加,同时返回最后一个遍历的对象。
下面代码的pre 代表前一个对象,current 代表当前对象。/** * pre 代表前一个对象 * current 代表当前对象 */ Employee reduceEmployee = employeeList.stream(.reduce(new Employee(, (pre,current -> { // 当第一次循环时前一个对象为null if (pre.getAge( == null { current.setAge(current.getAge(; } else { current.setAge(pre.getAge( + current.getAge(; } return current; }; System.out.println(reduceEmployee;
4.2 spark reduce
spark reduce的基本思想跟java stream是一样的。
直接看代码:Employee datasetReduce = employeeDataset.reduce(new ReduceFunction<Employee>( { @Override public Employee call(Employee t1, Employee t2 throws Exception { // 不同的版本看是否需要判断t1 == null t2.setAge(t1.getAge( + t2.getAge(; return t2; } }; System.out.println(datasetReduce;
输出
SparkDemo.Employee(name=周歌, age=178, department=研发部, level=普通员工
其它常见操作类
Employee employee = employeeDataset.filter("age > 30".limit(3.sort("age".first(; System.out.println(employee; // SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工
同时可以将dataset注册成table,使用更为强大的SQL来进行各种强大的运算。
现在SQL是flink的一等公民,spark也不遑多让。
这里举一个非常简单的例子。employeeDataset.registerTempTable("table"; session.sql("select * from table where age > 30 order by age desc limit 3".show(;
小结
本文依据java stream的相似性,介绍了spark里面一些常见的算子操作。
本文只是做一个非常简单的入门介绍。
如果感兴趣的话,
后端的同学可以尝试着操作一下,非常简单,本地不需要搭建环境,只要引入spark 的 maven依赖即可。
我把本文的所有代码全部贴在最后面。
点击查看代码
import lombok.*; import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; public class JavaStreamDemo { public static void main(String[] args throws IOException { /** * 张三,20,研发部,普通员工 * 李四,31,研发部,普通员工 * 李丽,36,财务部,普通员工 * 张伟,38,研发部,经理 * 杜航,25,人事部,普通员工 * 周歌,28,研发部,普通员工 */ List<String> list = FileUtils.readLines(new File("f:/test.txt", "utf-8"; List<Employee> employeeList = list.stream(.map(word -> { List<String> words = Arrays.stream(word.split(",".collect(Collectors.toList(; Employee employee = new Employee(words.get(0, Integer.parseInt(words.get(1, words.get(2, words.get(3; return employee; }.collect(Collectors.toList(; // employeeList.forEach(System.out::println; List<Employee> employeeList2 = list.stream(.flatMap(word -> { List<String> words = Arrays.stream(word.split(",".collect(Collectors.toList(; List<Employee> lists = new ArrayList<>(; Employee employee = new Employee(words.get(0, Integer.parseInt(words.get(1, words.get(2, words.get(3; lists.add(employee; Employee employee2 = new Employee(words.get(0+"_2", Integer.parseInt(words.get(1, words.get(2, words.get(3; lists.add(employee2; return lists.stream(; }.collect(Collectors.toList(; // employeeList2.forEach(System.out::println; Map<String, Long> map = employeeList.stream(.collect(Collectors.groupingBy(Employee::getDepartment, Collectors.counting(; System.out.println(map; Map<String, List<Employee>> map2 = employeeList.stream(.collect(Collectors.groupingBy(Employee::getDepartment; System.out.println(map2; int age = employeeList.stream(.mapToInt(e -> e.age.sum(; System.out.println(age;// 178 int age1 = employeeList.stream(.mapToInt(e -> e.getAge(.reduce(0,(a,b -> a+b; System.out.println(age1;// 178 /** * pre 代表前一个对象 * current 代表当前对象 */ Employee reduceEmployee = employeeList.stream(.reduce(new Employee(, (pre,current -> { if (pre.getAge( == null { current.setAge(current.getAge(; } else { current.setAge(pre.getAge( + current.getAge(; } return current; }; System.out.println(reduceEmployee; } @Getter @Setter @AllArgsConstructor @NoArgsConstructor @ToString static class Employee implements Serializable { private String name; private Integer age; private String department; private String level; } }
spark的源码:
点击查看代码
import com.google.gson.Gson; import lombok.*; import org.apache.spark.api.java.function.*; import org.apache.spark.sql.*; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; /** * @projectName: spark-demo * @package: com.alpha.data * @className: SparkDemo * @author: nyp * @description: TODO * @date: 2023/4/27 9:06 * @version: 1.0 */ public class SparkDemo { public static void main(String[] args { SparkSession session = SparkSession.builder(.master("local[*]".getOrCreate(; Dataset<Row> reader = session.read(.text("F:/test.txt"; // reader.show(; /** * +-----------------------+ * | value| * +-----------------------+ * |张三,20,研发部,普通员工| * |李四,31,研发部,普通员工| * |李丽,36,财务部,普通员工| * |张伟,38,研发部,经理| * |杜航,25,人事部,普通员工| * |周歌,28,研发部,普通员工| * +-----------------------+ */ // 本地演示而已,实际分布式环境,这里的gson涉及到序列化问题 // 算子以外的代码都在driver端运行 // 任何算子以内的代码都在executor端运行,即会在不同的服务器节点上执行 Gson gson = new Gson(; // a 算子外部,driver端 Dataset<Employee> employeeDataset = reader.map(new MapFunction<Row, Employee>( { @Override public Employee call(Row row throws Exception { // b 算子内部,executor端 Employee employee = null; try { // gson.fromJson(; 这里使用gson涉及到序列化问题 List<String> list = Arrays.stream(row.mkString(.split(",".collect(Collectors.toList(; employee = new Employee(list.get(0, Integer.parseInt(list.get(1, list.get(2, list.get(3; } catch (Exception exception { // 日志记录 // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致任务退出,所以这里要做好异常的抓取 exception.printStackTrace(; } return employee; } }, Encoders.bean(Employee.class; // employeeDataset.show(; /** * +---+----------+--------+----+ * |age|department| level|name| * +---+----------+--------+----+ * | 20| 研发部|普通员工|张三| * | 31| 研发部|普通员工|李四| * | 36| 财务部|普通员工|李丽| * | 38| 研发部| 经理|张伟| * | 25| 人事部|普通员工|杜航| * | 28| 研发部|普通员工|周歌| */ Dataset<Employee> employeeDataset2 = reader.mapPartitions(new MapPartitionsFunction<Row, Employee>( { @Override public Iterator<Employee> call(Iterator<Row> iterator throws Exception { List<Employee> employeeList = new ArrayList<>(; while (iterator.hasNext({ Row row = iterator.next(; try { List<String> list = Arrays.stream(row.mkString(.split(",".collect(Collectors.toList(; Employee employee = new Employee(list.get(0, Integer.parseInt(list.get(1, list.get(2, list.get(3; employeeList.add(employee; } catch (Exception exception { // 日志记录 // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致任务退出,所以这里要做好异常的抓取 exception.printStackTrace(; } } return employeeList.iterator(; } }, Encoders.bean(Employee.class; // employeeDataset2.show(; /** * +---+----------+--------+----+ * |age|department| level|name| * +---+----------+--------+----+ * | 20| 研发部|普通员工|张三| * | 31| 研发部|普通员工|李四| * | 36| 财务部|普通员工|李丽| * | 38| 研发部| 经理|张伟| * | 25| 人事部|普通员工|杜航| * | 28| 研发部|普通员工|周歌| * +---+----------+--------+----+ */ Dataset<Employee> employeeDatasetFlatmap = reader.flatMap(new FlatMapFunction<Row, Employee>( { @Override public Iterator<Employee> call(Row row throws Exception { List<Employee> employeeList = new ArrayList<>(; try { List<String> list = Arrays.stream(row.mkString(.split(",".collect(Collectors.toList(; Employee employee = new Employee(list.get(0, Integer.parseInt(list.get(1, list.get(2, list.get(3; employeeList.add(employee; Employee employee2 = new Employee(list.get(0+"_2", Integer.parseInt(list.get(1, list.get(2, list.get(3; employeeList.add(employee2; } catch (Exception exception { exception.printStackTrace(; } return employeeList.iterator(; } }, Encoders.bean(Employee.class; // employeeDatasetFlatmap.show(; /** * +---+----------+--------+------+ * |age|department| level| name| * +---+----------+--------+------+ * | 20| 研发部|普通员工| 张三| * | 20| 研发部|普通员工|张三_2| * | 31| 研发部|普通员工| 李四| * | 31| 研发部|普通员工|李四_2| * | 36| 财务部|普通员工| 李丽| * | 36| 财务部|普通员工|李丽_2| * | 38| 研发部| 经理| 张伟| * | 38| 研发部| 经理|张伟_2| * | 25| 人事部|普通员工| 杜航| * | 25| 人事部|普通员工|杜航_2| * | 28| 研发部|普通员工| 周歌| * | 28| 研发部|普通员工|周歌_2| * +---+----------+--------+------+ */ RelationalGroupedDataset datasetGroupBy = employeeDataset.groupBy("department"; // 统计每个部门有多少员工 // datasetGroupBy.count(.show(; /** * +----------+-----+ * |department|count| * +----------+-----+ * | 财务部| 1| * | 人事部| 1| * | 研发部| 4| * +----------+-----+ */ /** * 每个部门的平均年龄 */ // datasetGroupBy.avg("age".withColumnRenamed("avg(age","avgAge".show(; /** * +----------+--------+ * |department|avg(age| * +----------+--------+ * | 财务部| 36.0| * | 人事部| 25.0| * | 研发部| 29.25| * +----------+--------+ */ KeyValueGroupedDataset keyValueGroupedDataset = employeeDataset.groupByKey(new MapFunction<Employee, String>( { @Override public String call(Employee employee throws Exception { // 返回分组的key,这里表示根据部门进行分组 return employee.getDepartment(; } }, Encoders.STRING(; keyValueGroupedDataset.mapGroups(new MapGroupsFunction( { @Override public Object call(Object key, Iterator iterator throws Exception { System.out.println("key = " + key; while (iterator.hasNext({ System.out.println(iterator.next(; } return iterator; /** * key = 人事部 * SparkDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工 * key = 研发部 * SparkDemo.Employee(name=张三, age=20, department=研发部, level=普通员工 * SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工 * SparkDemo.Employee(name=张伟, age=38, department=研发部, level=经理 * SparkDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工 * key = 财务部 * SparkDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工 */ } }, Encoders.bean(Iterator.class .show(; // 这里的show(没有意义,只是触发计算而已 Employee datasetReduce = employeeDataset.reduce(new ReduceFunction<Employee>( { @Override public Employee call(Employee t1, Employee t2 throws Exception { // 不同的版本看是否需要判断t1 == null t2.setAge(t1.getAge( + t2.getAge(; return t2; } }; System.out.println(datasetReduce; Employee employee = employeeDataset.filter("age > 30".limit(3.sort("age".first(; System.out.println(employee; // SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工 employeeDataset.registerTempTable("table"; session.sql("select * from table where age > 30 order by age desc limit 3".show(; /** * +---+----------+--------+----+ * |age|department| level|name| * +---+----------+--------+----+ * | 38| 研发部| 经理|张伟| * | 36| 财务部|普通员工|李丽| * | 31| 研发部|普通员工|李四| * +---+----------+--------+----+ */ } @Getter @Setter @AllArgsConstructor @NoArgsConstructor @ToString public static class Employee implements Serializable { private String name; private Integer age; private String department; private String level; } }
spark maven依赖,自行不需要的spark-streaming,kafka依赖去掉。
点击查看代码
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <scala.version>2.12.15</scala.version> <spark.version>3.2.0</spark.version> <encoding>UTF-8</encoding> </properties> <dependencies> <!-- scala依赖--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- spark依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> <scope>provided</scope> </dependency> <!--<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>${spark.version}</version> </dependency>--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.34</version> </dependency> </dependencies>