欢迎光临
Spark重温笔记(四):秒级处理庞大数据量的 SparkSQL 操作大全,能否成为你的工作备忘指南?
   

Spark重温笔记(四):秒级处理庞大数据量的 SparkSQL 操作大全,能否成为你的工作备忘指南?

Spark学习笔记

前言:今天是温习 Spark 的第 4 天啦!主要梳理了 SparkSQL 工作中常用的操作大全,以及演示了几个企业级案例,希望对大家有帮助!

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


文章目录

  • Spark学习笔记
    • 二、SparkSQL基本入门
      • 1.sparkSQL简介
      • 2. 创建sparksession
      • 3. RDD 转化为 DataFrame 的两种方式
        • (1) toDF()
        • (2)spark.createDataFrame(schema)
        • (3) spark.createDataFrame(rdd,schema)
        • 4. Pandas to DF
        • 5. 常见spark读数据的三种方式
          • (1) spark.read.text()
          • (2) spark.read.format()
          • (3) DSL 和 SQL
          • 6. WordCount 的两种做法
            • (1) DSL 做法
            • (2) SQL 做法
            • 7. 电影案例解析
              • (1) 预先清洗数据
              • (2) 实战流程
              • 8. 数据清洗流程
                • (1)数据去重
                • (2) 数据空值处理
                • 9. 挑选花案例
                • (本节的所有数据集放在我的资源下载区哦,感兴趣的小伙伴可以自行下载:最全面的SparkSQL系列案例数据集)

                  二、SparkSQL基本入门

                  1.sparkSQL简介

                  Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

                  sparkSQL的四个特性

                  • 1-易整合:将sql查询与spark程序无缝混合,可以使用java、scala、python、R等语言的API操作。
                  • 2-统一的数据访问:以相同的方式连接到任何数据源。
                  • 3-兼容hive: 支持Hive HQL的语法,兼容hive(元数据库、SQL语法、UDF、序列化、反序列化机制)。
                  • 4-标准的数据连接:可以使用行业标准的JDBC或ODBC连接。

                    2. 创建sparksession

                    • 1-spark.read.text():读取文件
                    • 2-fileDF.printSchema():字段的名称和类型
                    • 3-fileDF.show(truncate=False):显示的字符串将不会被截断
                    • 4-fileDF.rdd.collect():以rdd的形式出现
                      _01SparkSessionCreate.py
                      # -*- coding: utf-8 -*-
                      # Program function:学会创建SparkSession
                      from pyspark.sql import SparkSession
                      from pyspark import SparkConf
                      if __name__ == '__main__':
                          # TODO 1-引入SparkSession的环境
                          conf = SparkConf().setAppName("sparksession").setMaster("local[*]")
                          spark = SparkSession.builder.config(conf=conf).getOrCreate()
                          # TODO 2-利用Spark环境变量生成SparkContext
                          sc = spark.sparkContext
                          # TODO 3-读取一个文件
                          fileDF = spark.read.text("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/words.txt")
                          # TODO 4-查看数据有多少行
                          print("fileDF counts value is:{}".format(fileDF.count()))  # fileDF counts value is:2
                          fileDF.printSchema()  # 字段的名称和字段的类型
                          # root
                          # |-- value: string (nullable = true)
                          fileDF.show(truncate=False)
                          # +------------------------+
                          # |value                   |
                          # +------------------------+
                          # |hello you Spark Flink   |
                          # |hello me hello she Spark|
                          # +------------------------+
                          print(fileDF.rdd.collect())
                          #[Row(value='hello you Spark Flink'), Row(value='hello me hello she Spark')]
                          
                          # TODO 5-查看数据有多少行
                          spark.stop()
                      
                      fileDF counts value is:2
                      root
                       |-- value: string (nullable = true)
                      +------------------------+
                      |value                   |
                      +------------------------+
                      |hello you Spark Flink   |
                      |hello me hello she Spark|
                      +------------------------+
                      [Row(value='hello you Spark Flink'), Row(value='hello me hello she Spark')]
                      

                      3. RDD 转化为 DataFrame 的两种方式

                      (1) toDF()

                      1-无参数:

                      • alias(‘age’):取别名操作
                      • Row(name=zhangsan, age=18):行操作
                      • dsl操作:df.select(df语句).show()
                      • sql操作:spark.sql(sql语句).show(),记得提前创建临时视图:df.createOrReplaceTempView(“t_table”)
                        _01rddToDataFrameWay1.py
                        # -*- coding: utf-8 -*-
                        # Program function:第一种方式处理rdd转化为df
                        '''
                        1-准备好上下文环境SparkSession
                        2-读取数据,sc.textFile()
                        3-使用Row对象对每行数据进行操作 Row(name=zhangsan,age=18)
                        4-使用spark.createDataFrame(schema)创建DataFrame
                        5-直接使用printSchema查看Scheme
                        6-使用show展示数据
                        '''
                        from pyspark.sql import SparkSession
                        from pyspark.sql.types import Row
                        if __name__ == '__main__':
                            # 1 - 准备好上下文环境SparkSession
                            spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
                            sc = spark.sparkContext
                            sc.setLogLevel("WARN")
                            # 2 - 读取数据,sc.textFile()
                            rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
                            file_map_rdd = rdd_file.map(lambda record: record.split(","))
                            # print(file__map.collect())
                            # 3 - 使用Row对象对每行数据进行操作Row(name=zhangsan, age=18)
                            df = file_map_rdd.map(lambda line: Row(name=line[0], age=int(line[1]))).toDF()
                            # 4 - SparkSQL提供了两种风格查询数据
                            # 4-1第一种风格DSL 领域查询语言df.select.filter
                            print("===================df.select DSL-=========================")
                            df.select("name").show()
                            df.select(["name", "age"]).show()
                            df.select(df.name, (df.age + 10).alias('age')).show()
                            # 4-2第二种风格SQL 写SQL实现
                            print("===================spark.sql- SQL=========================")
                            df.createOrReplaceTempView("t_table")
                            spark.sql("select * from t_table").show()
                            spark.sql("select name from t_table").show()
                            spark.sql("select name,age from t_table").show()
                            spark.sql("select name,age + 10 from t_table").show()
                            # 5 - 直接使用printSchema查看Scheme
                            df.printSchema()
                            spark.sql("desc t_table").show()
                            # 6 - 使用show展示数据
                            df.show()
                            sc.stop()
                        
                        ===================df.select DSL-=========================
                        +-------+
                        |   name|
                        +-------+
                        |Michael|
                        |   Andy|
                        | Justin|
                        +-------+
                        +-------+---+
                        |   name|age|
                        +-------+---+
                        |Michael| 29|
                        |   Andy| 30|
                        | Justin| 19|
                        +-------+---+
                        +-------+---+
                        |   name|age|
                        +-------+---+
                        |Michael| 39|
                        |   Andy| 40|
                        | Justin| 29|
                        +-------+---+
                        ===================spark.sql- SQL=========================
                        +-------+---+
                        |   name|age|
                        +-------+---+
                        |Michael| 29|
                        |   Andy| 30|
                        | Justin| 19|
                        +-------+---+
                        +-------+
                        |   name|
                        +-------+
                        |Michael|
                        |   Andy|
                        | Justin|
                        +-------+
                        +-------+---+
                        |   name|age|
                        +-------+---+
                        |Michael| 29|
                        |   Andy| 30|
                        | Justin| 19|
                        +-------+---+
                        +-------+--------------------------+
                        |   name|(age + CAST(10 AS BIGINT))|
                        +-------+--------------------------+
                        |Michael|                        39|
                        |   Andy|                        40|
                        | Justin|                        29|
                        +-------+--------------------------+
                        root
                         |-- name: string (nullable = true)
                         |-- age: long (nullable = true)
                        +--------+---------+-------+
                        |col_name|data_type|comment|
                        +--------+---------+-------+
                        |    name|   string|   null|
                        |     age|   bigint|   null|
                        +--------+---------+-------+
                        +-------+---+
                        |   name|age|
                        +-------+---+
                        |Michael| 29|
                        |   Andy| 30|
                        | Justin| 19|
                        +-------+---+
                          
                        

                        2-有参数:

                        _05rddToDataFrameWay4.py
                        # -*- coding: utf-8 -*-
                        # Program function:第一种方式处理rdd转化为df
                        '''
                        1-准备好上下文环境SparkSession
                        2-读取数据,sc.textFile()
                        3-使用StructType和StructFiled创建Schema
                        4-使用spark.createDataFrame(schema)创建DataFrame
                        5-直接使用printSchema查看Scheme
                        6-使用show展示数据
                        '''
                        from pyspark.sql import SparkSession
                        from pyspark.sql.types import *
                        if __name__ == '__main__':
                            # 1 - 准备好上下文环境SparkSession
                            spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
                            sc = spark.sparkContext
                            sc.setLogLevel("WARN")
                            # 2 - 读取数据,sc.textFile()
                            rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
                            file_map_rdd = rdd_file.map(lambda record: record.split(","))
                            #List[str]
                            df = file_map_rdd.toDF(["name", "age"])
                            df.printSchema()
                            df.show()
                            spark.stop()
                        
                        (2)spark.createDataFrame(schema)
                        • scheme_people :进行Row行操作了
                          _02rddToDataFrameWay1.py
                          # -*- coding: utf-8 -*-
                          # Program function:第一种方式处理rdd转化为df
                          '''
                          1-准备好上下文环境SparkSession
                          2-读取数据,sc.textFile()
                          3-使用Row对象对每行数据进行操作 Row(name=zhangsan,age=18)
                          4-使用spark.createDataFrame(schema)创建DataFrame
                          5-直接使用printSchema查看Scheme
                          6-使用show展示数据
                          '''
                          from numpy.distutils.system_info import dfftw_info
                          from pyspark.sql import SparkSession
                          from pyspark.sql.types import Row
                          if __name__ == '__main__':
                              # 1 - 准备好上下文环境SparkSession
                              spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
                              sc = spark.sparkContext
                              sc.setLogLevel("WARN")
                              # 2 - 读取数据,sc.textFile()
                              rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
                              file_map_rdd = rdd_file.map(lambda record: record.split(","))
                              # print(file__map.collect())
                              # 3 - 使用Row对象对每行数据进行操作Row(name=zhangsan, age=18)
                              scheme_people = file_map_rdd.map(lambda line: Row(name=line[0], age=int(line[1])))
                              # 4 - 使用spark.createDataFrame(schema)创建DataFrame
                              df = spark.createDataFrame(scheme_people)
                              # 5 - 直接使用printSchema查看Scheme
                              df.printSchema()
                              # 6 - 使用show展示数据
                              df.show()
                          
                          root
                           |-- name: string (nullable = true)
                           |-- age: long (nullable = true)
                          +-------+---+
                          |   name|age|
                          +-------+---+
                          |Michael| 29|
                          |   Andy| 30|
                          | Justin| 19|
                          +-------+---+
                            
                          
                          (3) spark.createDataFrame(rdd,schema)

                          1-不同字段类型时:

                          • peoplerdd:只是取出数据
                          • schema:使用StructType和StructFiled创建Schema
                            _03rddToDataFrameWay3.py
                            # -*- coding: utf-8 -*-
                            # Program function:第一种方式处理rdd转化为df
                            '''
                            1-准备好上下文环境SparkSession
                            2-读取数据,sc.textFile()
                            3-使用StructType和StructFiled创建Schema
                            4-使用spark.createDataFrame(schema)创建DataFrame
                            5-直接使用printSchema查看Scheme
                            6-使用show展示数据
                            '''
                            from pyspark.sql import SparkSession
                            from pyspark.sql.types import *
                            if __name__ == '__main__':
                                # 1 - 准备好上下文环境SparkSession
                                spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
                                sc = spark.sparkContext
                                sc.setLogLevel("WARN")
                                # 2 - 读取数据,sc.textFile()
                                rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
                                file_map_rdd = rdd_file.map(lambda record: record.split(","))
                                # print(file__map.collect())
                                # 3 - 使用Row对象对每行数据进行操作Row(name=zhangsan, age=18)
                                peoplerdd = file_map_rdd.map(lambda line: (line[0], int(line[1].strip())))
                                # 使用StructType和StructFiled创建Schema
                                schema = StructType([StructField("name", StringType(), True), StructField("age", IntegerType(), True)])
                                # 它的默认值为 True,即表示该字段可以为空。
                                
                                # 4 - 使用spark.createDataFrame(schema)创建DataFrame
                                df = spark.createDataFrame(peoplerdd, schema)
                                # 5 - 直接使用printSchema查看Scheme
                                df.printSchema()
                                # root
                                # | -- name: string(nullable=true)
                                # | -- age: integer(nullable=true)
                                # 6 - 使用show展示数据
                                df.show()
                                spark.stop()
                            
                            root
                             |-- name: string (nullable = true)
                             |-- age: integer (nullable = true)
                            +-------+---+
                            |   name|age|
                            +-------+---+
                            |Michael| 29|
                            |   Andy| 30|
                            | Justin| 19|
                            +-------+---+
                              
                            

                            1-相同字段类型时:

                            _04rddToDataFrameWay31.py
                            # -*- coding: utf-8 -*-
                            # Program function:第一种方式处理rdd转化为df
                            '''
                            1-准备好上下文环境SparkSession
                            2-读取数据,sc.textFile()
                            3-使用StructType和StructFiled创建Schema
                            4-使用spark.createDataFrame(schema)创建DataFrame
                            5-直接使用printSchema查看Scheme
                            6-使用show展示数据
                            '''
                            from pyspark.sql import SparkSession
                            from pyspark.sql.types import *
                            if __name__ == '__main__':
                                # 1 - 准备好上下文环境SparkSession
                                spark = SparkSession.builder.master("local[*]").appName("testPi").getOrCreate()
                                sc = spark.sparkContext
                                sc.setLogLevel("WARN")
                                # 2 - 读取数据,sc.textFile()
                                rdd_file = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.txt")
                                file_map_rdd = rdd_file.map(lambda record: record.split(","))
                                # print(file__map.collect())
                                # 3 - 使用Row对象对每行数据进行操作Row(name=zhangsan, age=18)
                                peoplerdd = file_map_rdd.map(lambda line: (line[0], int(line[1].strip())))
                                
                                # 使用StructType和StructFiled创建Schema
                                schemaName = "name,age"
                                split_ = [StructField(scheme, StringType(), True) for scheme in schemaName.split(",")]
                                schema = StructType(split_)
                                
                                # 4 - 使用spark.createDataFrame(schema)创建DataFrame
                                df = spark.createDataFrame(peoplerdd, schema)
                                # 5 - 直接使用printSchema查看Scheme
                                df.printSchema()
                                # root
                                # | -- name: string(nullable=true)
                                # | -- age: integer(nullable=true)
                                # 6 - 使用show展示数据
                                df.show()
                                spark.stop()
                            

                            4. Pandas to DF

                            • spark.createDataFrame(pandas_df)
                              pandasToDF.py
                              # -*- coding: utf-8 -*-
                              # Program function:pandas转化为DF
                              import pandas as pd
                              from pyspark.sql import SparkSession
                              from datetime import datetime, date
                              if __name__ == '__main__':
                                  spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
                                  sc = spark.sparkContext
                                  sc.setLogLevel("WARN")
                                  pandas_df = pd.DataFrame({
                                      'a': [1, 2, 3],
                                      'b': [2., 3., 4.],
                                      'c': ['string1', 'string2', 'string3'],
                                      'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
                                      'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
                                  })
                                  print(pandas_df)
                                  # a    b        c           d                   e
                                  #  0  1  2.0  string1  2000-01-01 2000-01-01 12:00:00
                                  #  1  2  3.0  string2  2000-02-01 2000-01-02 12:00:00
                                  #  2  3  4.0  string3  2000-03-01 2000-01-03 12:00:00
                                  print(pandas_df.shape)  # (3, 5)
                                  # print(pandas_df.values)
                                  # from an :class:`RDD`, a list or a :class:`pandas.DataFrame`.
                                  df_pandas = spark.createDataFrame(pandas_df)
                                  df_pandas.printSchema()
                                  # root
                                  # | -- a: long(nullable=true)
                                  # | -- b: double(nullable=true)
                                  # | -- c: string(nullable=true)
                                  # | -- d: date(nullable=true)
                                  # | -- e: timestamp(nullable=true)
                                  df_pandas.show()
                                  print(df_pandas.rdd.collect())
                                  spark.stop()
                              

                              5. 常见spark读数据的三种方式

                              (1) spark.read.text()
                              (2) spark.read.format()
                              • 1-读csv格式数据
                                • option(“header”,True):保留文件头
                                • option(“sep”,“;”):分隔符是;
                                • option(“inferSchema”,True):自动推断每列的数据类型
                                • load(路径地址)
                                • 2-读取json数据
                                • 3-读取Parquet数据
                                  _01_readCsvData.py
                                  # -*- coding: utf-8 -*-
                                  # Program function:读取csv数据
                                  # csv 以逗号作为分隔符的文本
                                  from pyspark.sql import SparkSession
                                  if __name__ == '__main__':
                                      spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
                                      sc = spark.sparkContext
                                      sc.setLogLevel("WARN")
                                      # 读取csv数据
                                      csv_data=spark.read.format("csv")\
                                          .option("header",True)\
                                          .option("sep",";")\
                                          .option("inferSchema",True)\
                                          .load("/export/data/pyspark_workspace/PySpark-SparkSQL_3.1.2/data/sql/people.csv")
                                      csv_data.printSchema()
                                      csv_data.show()
                                      print(type(csv_data))#
                                      # 读取Json数据
                                      json__load = spark.read.format("json").load("/export/data/pyspark_workspace/PySpark-SparkSQL_3.1.2/data/sql/people.json")
                                      json__load.printSchema()
                                      json__load.show()
                                      # 读取Parquet数据
                                      parquet__load = spark.read.format("parquet").load("/export/data/pyspark_workspace/PySpark-SparkSQL_3.1.2/data/sql/users.parquet")
                                      parquet__load.printSchema()
                                      parquet__load.show()
                                  
                                  (3) DSL 和 SQL
                                  • 1-DSL
                                    • 查看数据:show,printSchema
                                    • 查看字段数据:col,column,pandas语法
                                    • 统计:groupby,count,orderby,agg
                                    • 重命名:withColumnRenamed
                                    • 2-SQL
                                      • 创建临时视图:createOrReplaceTempView
                                      • spark.sql(SQL语法)
                                        # -*- coding: utf-8 -*-
                                        # Program function:DSL & SQL
                                        from pyspark.sql import SparkSession
                                        if __name__ == '__main__':
                                            # 1-准备环境变量
                                            spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
                                            sc = spark.sparkContext
                                            sc.setLogLevel("WARN")
                                            # 2-读取数据
                                            dataDF = spark.read.format("csv") \
                                                .option("header", "true") \
                                                .option("inferSchema", True) \
                                                .option("sep", ";") \
                                                .load("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/people.csv")
                                            # 3-查看数据
                                            dataDF.show(2, truncate=False)
                                            dataDF.printSchema()
                                            # 4-执行DSL的操作
                                            from pyspark.sql.functions import col, column
                                            # 查看name字段的数据
                                            dataDF.select("name").show()
                                            dataDF.select(col("name")).show()
                                            # dataDF.select(column("name")).show()
                                            dataDF.select(dataDF.name).show()
                                            dataDF.select(dataDF["name"]).show()
                                            # 查看name,age字段的数据
                                            dataDF.select(["name", "age"]).show()
                                            dataDF.select(col("name"), col("age")).show()
                                            dataDF.select(dataDF["name"], col("age")).show()
                                            dataDF.select(dataDF.name, col("age")).show()
                                            # 过滤personDF的年龄大于21岁的信息
                                            dataDF.filter("age >30").show()
                                            dataDF.filter(dataDF["age"] > 30).show()
                                            dataDF.filter(col("age") > 30).show()
                                            # groupBy统计
                                            dataDF.groupby("age").count().orderBy("count").withColumnRenamed("count", "countBig").show()
                                            from pyspark.sql import functions as F
                                            dataDF.groupby("age").agg(F.count(dataDF.age)).show()
                                            dataDF.groupby("age").agg({"age": "count"}).show()
                                            # SQL
                                            dataDF.createOrReplaceTempView("t_table")
                                            spark.sql("select name from t_table").show()
                                            spark.sql("select name,age from t_table").printSchema()
                                            spark.sql("select Name,age from t_table").printSchema()
                                            # root
                                            # | -- Name: string(nullable=true)
                                            # | -- age: integer(nullable=true)
                                            spark.sql("select name ,age from t_table where age>30").show()
                                            spark.sql("select name ,age from t_table order by age limit 2").show()
                                            spark.stop()
                                        

                                        6. WordCount 的两种做法

                                        (1) DSL 做法
                                        • 1-explode爆炸函数:扁平化操作
                                        • 2-withColumn:有则改之,无则添之
                                          # -*- coding: utf-8 -*-
                                          # Program function:DSL wordcount
                                          from pyspark.sql import SparkSession
                                          if __name__ == '__main__':
                                              # 1-准备环境变量
                                              spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
                                              sc = spark.sparkContext
                                              sc.setLogLevel("WARN")
                                              # 2-读取数据
                                              dataDF = spark.read.text("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/words.txt")
                                              # 3-查看数据
                                              dataDF.printSchema()
                                              # root
                                              # | -- value: string(nullable=true)
                                              # 4-wordcount
                                              from pyspark.sql import functions as F
                                              # 这里使用explode爆炸函数将文本数据扁平化处理
                                              # withColumn,如果有相同列调换掉,否则增加列
                                              dataExplodeDF = dataDF.withColumn("words", F.explode(F.split(F.col("value"), " ")))
                                              dataExplodeDF.show(truncate=False)
                                              # +------------------------+-----+
                                              # |value                   |words|
                                              # +------------------------+-----+
                                              # |hello you Spark Flink   |hello|
                                              # |hello you Spark Flink   |you  |
                                              # |hello you Spark Flink   |Spark|
                                              # |hello you Spark Flink   |Flink|
                                              # |hello me hello she Spark|hello|
                                              # |hello me hello she Spark|me   |
                                              # |hello me hello she Spark|hello|
                                              # |hello me hello she Spark|she  |
                                              # |hello me hello she Spark|Spark|
                                              # +------------------------+-----+
                                              dataExplodeDF.groupby("words").count().orderBy("count", ascending=False).show()
                                              # +-----+-----+
                                              # | words | count |
                                              # +-----+-----+
                                              # | hello | 3 |
                                              # | Spark | 2 |
                                              # | me | 1 |
                                              # | Flink | 1 |
                                              # | you | 1 |
                                              # | she | 1 |
                                              # +-----+-----+
                                          
                                          (2) SQL 做法
                                          • 1-split(value,’ '):切割函数
                                          • 2-explode:爆炸函数可以结合切割函数
                                            # -*- coding: utf-8 -*-
                                            # Program function:DSL wordcount
                                            from pyspark.sql import SparkSession
                                            if __name__ == '__main__':
                                                # 1-准备环境变量
                                                spark = SparkSession.builder.appName("readData").master("local[*]").getOrCreate()
                                                sc = spark.sparkContext
                                                sc.setLogLevel("WARN")
                                                # 2-读取数据
                                                dataDF = spark.read.text("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/words.txt")
                                                # 3-查看数据
                                                dataDF.printSchema()
                                                # root
                                                # | -- value: string(nullable=true)
                                                # 4-wordcount
                                                dataDF.createOrReplaceTempView("t_table")
                                                spark.sql("select split(value,' ') from t_table").show(truncate=False)
                                                # +------------------------------+
                                                # |split(value,  , -1)           |
                                                # +------------------------------+
                                                # |[hello, you, Spark, Flink]    |
                                                # |[hello, me, hello, she, Spark]|
                                                # +------------------------------+
                                                
                                                spark.sql("select explode(split(value,' ')) as words from t_table").show(truncate=False)
                                                # +-----+
                                                # |words|
                                                # +-----+
                                                # |hello|
                                                # |you  |
                                                # |Spark|
                                                # |Flink|
                                                # |hello|
                                                # |me   |
                                                # |hello|
                                                # |she  |
                                                # |Spark|
                                                # +-----+
                                                
                                                spark.sql("""
                                                    select words,count(1) as count from 
                                                    (select explode(split(value,' ')) as words from t_table) w
                                                    group by words
                                                    order by count desc
                                                """).show(truncate=False)
                                                # +-----+-----+
                                                # | words | count |
                                                # +-----+-----+
                                                # | hello | 3 |
                                                # | Spark | 2 |
                                                # | you | 1 |
                                                # | me | 1 |
                                                # | Flink | 1 |
                                                # | she | 1 |
                                                # +-----+-----+
                                            

                                            7. 电影案例解析

                                            (1) 预先清洗数据
                                            • 1-读取后转化为to_df
                                              • 过滤:每一行不为空,且切割后不能缺字段
                                              • 映射:正式切割
                                              • 映射:逐一对应字段
                                              • toDF:分配字段
                                                # -*- coding: utf-8 -*-
                                                # Program function:对电影数据集的数据接入演示
                                                '''
                                                * 之前做数据的接入:
                                                * 1-查看数据的格式,根据几种方式读取即可
                                                * 2-准备环境
                                                * 3-读取数据文件、
                                                * 4-进行转化DF
                                                * 5-后续统计
                                                '''
                                                from pyspark.sql import SparkSession
                                                from pyspark import SparkConf
                                                import re
                                                if __name__ == '__main__':
                                                    # TODO *之前做数据的接入:
                                                    # TODO *1 - 查看数据的格式,根据几种方式读取即可--结论:通过rdd方式读取并过滤的操作,之后sc.textFile
                                                    # TODO *2 - 准备环境
                                                    conf = SparkConf().setAppName("testIris").setMaster("local[*]")
                                                    spark = SparkSession.builder.config(conf=conf).getOrCreate()
                                                    sc = spark.sparkContext
                                                    sc.setLogLevel("WARN")
                                                    # TODO *3 - 读取数据文件
                                                    movies_rdd = sc.textFile("/export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/ml-100k/u.data")
                                                    print("movies count is:" + str(movies_rdd.count()))
                                                    # TODO 数据清洗
                                                    moviesDF = movies_rdd \
                                                        .filter(lambda line: (len(line.strip()) > 0 and (len(re.split("\s+", line.strip())) == 4))) \
                                                        .map(lambda line: re.split("\s+", line.strip())) \
                                                        .map(lambda line: (int(line[0]), int(line[1]), float(line[2]), int(line[3]))) \
                                                        .toDF(["userid", "itemid", "rating", "timestamp"])
                                                    moviesDF.show()
                                                    moviesDF.printSchema()
                                                    # root
                                                    # |-- userid: long (nullable = true)
                                                    # |-- itemid: long (nullable = true)
                                                    # |-- rating: double (nullable = true)
                                                    # |-- timestamp: long (nullable = true)
                                                    # TODO *4 - 进行转化DF
                                                    # TODO *5 - 后续统计
                                                
                                                • 2-读取后自定义schema
                                                  • StructType,StructField定义字段
                                                  • createDataFrame(data,schema)
                                                    # -*- coding: utf-8 -*-
                                                    # Program function:对电影数据集的数据接入演示
                                                    '''
                                                    * 之前做数据的接入:
                                                    * 1-查看数据的格式,根据几种方式读取即可
                                                    * 2-准备环境
                                                    * 3-读取数据文件、
                                                    * 4-进行转化DF
                                                    * 5-后续统计
                                                    '''
                                                    from pyspark.sql import SparkSession
                                                    from pyspark import SparkConf
                                                    import re
                                                    if __name__ == '__main__':
                                                        # TODO *之前做数据的接入:
                                                        # TODO *1 - 查看数据的格式,根据几种方式读取即可--结论:通过rdd方式读取并过滤的操作,之后sc.textFile
                                                        # TODO *2 - 准备环境
                                                        conf = SparkConf().setAppName("testIris").setMaster("local[*]")
                                                        spark = SparkSession.builder.config(conf=conf).getOrCreate()
                                                        sc = spark.sparkContext
                                                        sc.setLogLevel("WARN")
                                                        # TODO *3 - 读取数据文件
                                                        movies_rdd = sc.textFile("/export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/ml-100k/u.data")
                                                        print("movies count is:" + str(movies_rdd.count()))
                                                        # TODO 数据清洗
                                                        moviesData = movies_rdd \
                                                            .filter(lambda line: (len(line.strip()) > 0 and (len(re.split("\s+", line.strip())) == 4))) \
                                                            .map(lambda line: re.split("\s+", line.strip())) \
                                                            .map(lambda line: (int(line[0]), int(line[1]), float(line[2]), int(line[3])))
                                                        from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
                                                        movies_schehma = StructType([StructField("userid", StringType(), True),
                                                                                     StructField("itemid", IntegerType(), False),
                                                                                     StructField("rating", FloatType(), False),
                                                                                     StructField("timestamp", IntegerType(), False),
                                                                                     ])
                                                        moviesDF = spark.createDataFrame(moviesData, movies_schehma)
                                                        moviesDF.show()
                                                        moviesDF.printSchema()
                                                        # root
                                                        # |-- userid: long (nullable = true)
                                                        # |-- itemid: long (nullable = true)
                                                        # |-- rating: double (nullable = true)
                                                        # |-- timestamp: long (nullable = true)
                                                        # TODO *4 - 进行转化DF
                                                        # TODO *5 - 后续统计
                                                    
                                                    (2) 实战流程
                                                    • 1-设置shuffle做法:这种设置的好处是可以减少 Shuffle 过程中的数据传输和磁盘 IO,从而提高性能。然而,设置过小的分区数可能会导致数据不均匀的问题,从而影响性能。因此,在设置 shuffle 分区数时,需要综合考虑数据量、集群资源和任务需求等因素。
                                                    • 2-写入数据存储
                                                      • coalesce:减少分区操作
                                                      • write:写入操作
                                                      • csv:数据格式地址注意是文件夹,且不能私自创建
                                                      • mysql:一个format和mode,五个option,一个save
                                                        # -*- coding: utf-8 -*-
                                                        # Program function:对电影数据集进行按照需求统计分析
                                                        '''
                                                        * 1-准备SparkSession
                                                        * 2-读取ratings.dat数据文件
                                                        * 3-转化为DF
                                                        * 4-使用DSL实战
                                                        * 5-使用SQL实战
                                                        * 6-将结果输出
                                                        * 7-spark.stop
                                                        '''
                                                        import time
                                                        from pyspark.sql import SparkSession
                                                        from pyspark.sql.types import Row
                                                        import os
                                                        os.environ['SPARK_HOME'] = '/export/server/spark'
                                                        PYSPARK_PYTHON = "/root/anaconda3//bin/python"
                                                        # 当存在多个版本时,不指定很可能会导致出错
                                                        os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
                                                        os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
                                                        if __name__ == '__main__':
                                                            # *1 - 准备SparkSession
                                                            spark = SparkSession \
                                                                .builder \
                                                                .appName("testRatings") \
                                                                .master("local[3]") \
                                                                .config("spark.sql.shuffle.partitions", "2") \
                                                                .getOrCreate()
                                                            sc = spark.sparkContext
                                                            sc.setLogLevel("WARN")  # 默认用INfo打印很多内容,4040WebUI端口
                                                            # *2 - 读取ratings.dat数据文件
                                                            ratingsRDD = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/ml-1m/ratings.dat")
                                                            # 数据基础预处理
                                                            # *3 - 转化为DF
                                                            ratingsDF = ratingsRDD \
                                                                .filter(lambda line: len(line.strip()) > 0 and len(line.strip().split("::")) == 4) \
                                                                .map(lambda line: line.strip().split("::")) \
                                                                .map(lambda p: Row(userid=int(p[0]), moviesId=int(p[1]), ratings=float(p[2]), timestamp=int(p[3]))) \
                                                                .toDF()
                                                            # *5 - 使用SQL实战
                                                            # 对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)。
                                                            # ratingsDF.printSchema()
                                                            # ratingsDF.show(2)
                                                            ratingsDF.createOrReplaceTempView("table_view")
                                                            spark.sql("""
                                                                select moviesId,round(avg(ratings),2) as cnt_rating,count(moviesId) as cnt_movies
                                                                from table_view
                                                                group by moviesId
                                                                having cnt_movies>2000
                                                                order by cnt_rating desc,cnt_movies desc
                                                                limit 10
                                                            """).show()
                                                            # +--------+----------+----------+
                                                            # |moviesId|cnt_rating|cnt_movies|
                                                            # +--------+----------+----------+
                                                            # |     318|      4.55|      2227|
                                                            # |     858|      4.52|      2223|
                                                            # |     527|      4.51|      2304|
                                                            # |    1198|      4.48|      2514|
                                                            # *4 - 使用DSL实战
                                                            import pyspark.sql.functions as F
                                                            # resultDF = ratingsDF \
                                                            #     .select("moviesId", "ratings") \
                                                            #     .groupby("moviesId") \
                                                            #     .agg(F.round(F.avg("ratings"), 2).alias("cnt_rating"), F.count("moviesId").alias("cnt_movies")) \
                                                            #     .filter("cnt_movies >2000") \
                                                            #     .orderBy(["cnt_rating", "cnt_movies"], ascending=[0, 0]) \
                                                            #     .limit(10)
                                                            # resultDF.show()
                                                            # +--------+----------+----------+
                                                            # |moviesId|cnt_rating|cnt_movies|
                                                            # +--------+----------+----------+
                                                            # |     318|      4.55|      2227|
                                                            # |     858|      4.52|      2223|
                                                            # |     527|      4.51|      2304|
                                                            # *6 - 将结果输出到csv
                                                            # resultDF\
                                                            #     .coalesce(1)\
                                                            #     .write\
                                                            #     .csv("/export/data/pyspark_workspace/PySpark-SparkSQL_3.1.2/data/output1")
                                                            # *6 - 将结果输出到mysql
                                                            # 如果没有bigdata数库可以创建一个database,create database bigdata;
                                                            # resultDF\
                                                            #     .coalesce(1)\
                                                            #     .write\
                                                            #     .format("jdbc") \
                                                            #     .mode("overwrite") \
                                                            #     .option("driver", "com.mysql.jdbc.Driver") \
                                                            #     .option("url", "jdbc:mysql://node1:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true") \
                                                            #     .option("dbtable", "bigdata.tb_top10_movies") \
                                                            #     .option("user", "root") \
                                                            #     .option("password", "123456") \
                                                            #     .save()
                                                            print("写入MySQL停止")
                                                            # *7 - spark.stop
                                                            time.sleep(600)
                                                            spark.stop()
                                                        

                                                        8. 数据清洗流程

                                                        (1)数据去重
                                                        • 1-createDataFrame:(数据集,字段列表)
                                                        • 2-dropDuplicates:删除所有字段重复记录,subset参数可以指定字段
                                                        • 3-countDistinct:去重计数
                                                        • 4-count:普通计数
                                                        • 5-monotonically_increasing_id:自增的id列
                                                          01dataClean.py
                                                          # -*- coding: utf-8 -*-
                                                          # Program function:数据清洗的操作
                                                          from pyspark.sql import SparkSession
                                                          from pyspark import SparkConf
                                                          import re
                                                          if __name__ == '__main__':
                                                              # TODO *之前做数据的接入:
                                                              # TODO *1 - 准备环境
                                                              conf = SparkConf().setAppName("testIris").setMaster("local[*]")
                                                              spark = SparkSession.builder.config(conf=conf).getOrCreate()
                                                              sc = spark.sparkContext
                                                              sc.setLogLevel("WARN")
                                                              # TODO 准备好数据集
                                                              df = spark.createDataFrame([
                                                                  (1, 144.5, 5.9, 33, 'M'),
                                                                  (2, 167.2, 5.4, 45, 'M'),
                                                                  (3, 124.1, 5.2, 23, 'F'),
                                                                  (4, 144.5, 5.9, 33, 'M'),
                                                                  (5, 133.2, 5.7, 54, 'F'),
                                                                  (3, 124.1, 5.2, 23, 'F'),
                                                                  (5, 129.2, 5.3, 42, 'M'),
                                                              ], ['id', 'weight', 'height', 'age', 'gender'])
                                                              # 无意义重复数据去重:数据中行与行完全重复
                                                              # 1.首先删除完全一样的记录
                                                              print("=================1-删除所有字段重复记录================")
                                                              df1 = df.dropDuplicates()
                                                              df1.show()
                                                              
                                                              # 2.其次,关键字段值完全一模一样的记录(在这个例子中,是指除了id之外的列一模一样)
                                                              print("=================2-删除除了id字段重复记录================")
                                                              # print(df1.columns)#['id', 'weight', 'height', 'age', 'gender']
                                                              df2 = df1.dropDuplicates(subset=[c for c in df1.columns if c != 'id'])
                                                              df2.show()
                                                              # 3- 查看某一列是否有重复值,统计id列,及去重id列后去重
                                                              print("=================3-查看某一列是否有重复值================")
                                                              import pyspark.sql.functions as F
                                                              df3 = df2.agg(F.count("id").alias("id_Count"), F.countDistinct("id").alias("id_distinct_count"))
                                                              df3.show()
                                                              print("=================4-自增的id列================")
                                                              df4 = df2.withColumn("new_id", F.monotonically_increasing_id())
                                                              df4.show()
                                                              #+---+------+------+---+------+-------------+
                                                              # | id|weight|height|age|gender|       new_id|
                                                              # +---+------+------+---+------+-------------+
                                                              # |  5| 133.2|   5.7| 54|     F|  25769803776|
                                                              # |  1| 144.5|   5.9| 33|     M| 171798691840|
                                                              # |  2| 167.2|   5.4| 45|     M| 592705486848|
                                                              # |  3| 124.1|   5.2| 23|     F|1236950581248|
                                                              # |  5| 129.2|   5.3| 42|     M|1365799600128|
                                                              # +---+------+------+---+------+-------------+
                                                          
                                                          =================2-删除除了id字段重复记录================
                                                          +---+------+------+---+------+
                                                          | id|weight|height|age|gender|
                                                          +---+------+------+---+------+
                                                          |  5| 133.2|   5.7| 54|     F|
                                                          |  1| 144.5|   5.9| 33|     M|
                                                          |  2| 167.2|   5.4| 45|     M|
                                                          |  3| 124.1|   5.2| 23|     F|
                                                          |  5| 129.2|   5.3| 42|     M|
                                                          +---+------+------+---+------+
                                                          =================3-查看某一列是否有重复值================
                                                          +--------+-----------------+
                                                          |id_Count|id_distinct_count|
                                                          +--------+-----------------+
                                                          |       5|                4|
                                                          +--------+-----------------+
                                                          =================4-自增的id列================
                                                          +---+------+------+---+------+-------------+
                                                          | id|weight|height|age|gender|       new_id|
                                                          +---+------+------+---+------+-------------+
                                                          |  5| 133.2|   5.7| 54|     F|  25769803776|
                                                          |  1| 144.5|   5.9| 33|     M| 171798691840|
                                                          |  2| 167.2|   5.4| 45|     M| 592705486848|
                                                          |  3| 124.1|   5.2| 23|     F|1236950581248|
                                                          |  5| 129.2|   5.3| 42|     M|1365799600128|
                                                          +---+------+------+---+------+-------------+
                                                            
                                                          
                                                          (2) 数据空值处理
                                                          • 1-row和列表推导式结合sum函数:计算每条记录的缺失值情况
                                                          • 2-count:计算各列的非缺失值个数
                                                          • 3-*:全部字段操作
                                                          • 4-字段的缺失值比例:缺失的个数/总个数
                                                          • 5-拆分技巧:agg聚合函数
                                                          • 6-删除非空值少于thresh的缺失行:dropna(),thresh参数
                                                          • 7-删除缺失值过于严重的列:其实是先建一个DF,不要缺失值的列
                                                            # -*- coding: utf-8 -*-
                                                            # Program function:数据清洗的操作
                                                            from pyspark.sql import SparkSession
                                                            from pyspark import SparkConf
                                                            import re
                                                            if __name__ == '__main__':
                                                                # TODO *之前做数据的接入:
                                                                # TODO *1 - 准备环境
                                                                conf = SparkConf().setAppName("testIris").setMaster("local[*]")
                                                                spark = SparkSession.builder.config(conf=conf).getOrCreate()
                                                                sc = spark.sparkContext
                                                                sc.setLogLevel("WARN")
                                                                # TODO 准备好数据集
                                                                df_miss = spark.createDataFrame([
                                                                    (1, 143.5, 5.6, 28, 'M', 100000),
                                                                    (2, 167.2, 5.4, 45, 'M', None),
                                                                    (3, None, 5.2, None, None, None),
                                                                    (4, 144.5, 5.9, 33, 'M', None),
                                                                    (5, 133.2, 5.7, 54, 'F', None),
                                                                    (6, 124.1, 5.2, None, 'F', None),
                                                                    (7, 129.2, 5.3, 42, 'M', 76000), ],
                                                                    ['id', 'weight', 'height', 'age', 'gender', 'income'])
                                                                # TODO 1:使用列表推导式结合sum函数--计算每条记录的缺失值情况
                                                                dfmiss_sum = df_miss.rdd.map(lambda row: (row['id'], sum([c == None for c in row])))
                                                                print("diss_Sum value is:", dfmiss_sum.collect())
                                                                # diss_Sum value is: [(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]
                                                                # 2.计算各列的非缺失值个数
                                                                import pyspark.sql.functions as F
                                                                df_miss.agg(F.count("id").alias("id_Count"),
                                                                            F.count("income").alias("income_count"),
                                                                            F.count("*").alias("all")).show()
                                                                # +--------+------------+
                                                                # |id_Count|income_count|
                                                                # +--------+------------+
                                                                # |       7|           2|
                                                                # +--------+------------+
                                                                # 因为income初始值有5,5/7=70%,业务上规定达到60%以上空值去掉该列
                                                                df_miss.agg(*[F.count(t).alias(t + "dismissing") for t in df_miss.columns if t != 'income']).show()
                                                                # +------------+----------------+----------------+-------------+----------------+
                                                                # |iddismissing|weightdismissing|heightdismissing|agedismissing|genderdismissing|
                                                                # +------------+----------------+----------------+-------------+----------------+
                                                                # |           7|               6|               7|            5|               6|
                                                                # +------------+----------------+----------------+-------------+----------------+
                                                                # 如何统计出缺失值的比例
                                                                # 方法:1- 非缺失值个数/总数据个数=缺失值比例
                                                                df_miss.agg(*[(1 - (F.count(c) / F.count('*'))).alias(c + '_missing') for c in df_miss.columns]).show()
                                                                # +----------+------------------+--------------+------------------+------------------+------------------+
                                                                # |id_missing|    weight_missing|height_missing|       age_missing|    gender_missing|    income_missing|
                                                                # +----------+------------------+--------------+------------------+------------------+------------------+
                                                                # |       0.0|0.1428571428571429|           0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
                                                                # +----------+------------------+--------------+------------------+------------------+------------------+
                                                                # 拆分
                                                                df_miss.agg((F.count("weight") / F.count("*")).alias("w_missing"),
                                                                            (F.count("height") / F.count("*")).alias("w_missing")).show()
                                                                
                                                                # +------------------+---------+
                                                                # |         w_missing|w_missing|
                                                                # +------------------+---------+
                                                                # |0.8571428571428571|      1.0|
                                                                # +------------------+---------+
                                                                # 3、删除缺失值过于严重的列
                                                                # 其实是先建一个DF,不要缺失值的列
                                                                df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != 'income'])
                                                                df_miss_no_income.show()
                                                                # +---+------+------+----+------+
                                                                # | id|weight|height| age|gender|
                                                                # +---+------+------+----+------+
                                                                # |  1| 143.5|   5.6|  28|     M|
                                                                # |  2| 167.2|   5.4|  45|     M|
                                                                # |  3|  null|   5.2|null|  null|
                                                                # |  4| 144.5|   5.9|  33|     M|
                                                                # |  5| 133.2|   5.7|  54|     F|
                                                                # |  6| 124.1|   5.2|null|     F|
                                                                # |  7| 129.2|   5.3|  42|     M|
                                                                # +---+------+------+----+------+
                                                                # drop rows that have less than `thresh` non-null values.
                                                                # 删除少于thread的非空行
                                                                df_miss_no_income.dropna(thresh=3).show()
                                                                
                                                                # +---+------+------+----+------+
                                                                # | id|weight|height| age|gender|
                                                                # +---+------+------+----+------+
                                                                # |  1| 143.5|   5.6|  28|     M|
                                                                # |  2| 167.2|   5.4|  45|     M|
                                                                # |  4| 144.5|   5.9|  33|     M|
                                                                # |  5| 133.2|   5.7|  54|     F|
                                                                # |  6| 124.1|   5.2|null|     F|
                                                                # |  7| 129.2|   5.3|  42|     M|
                                                                # +---+------+------+----+------+
                                                                
                                                                # 5、填充缺失值,可以用fillna来填充缺失值,
                                                                # 对于bool类型、或者分类类型,可以为缺失值单独设置一个类型,missing
                                                                # 对于数值类型,可以用均值或者中位数等填充
                                                                # fillna可以接收两种类型的参数:
                                                                # 一个数字、字符串,这时整个DataSet中所有的缺失值都会被填充为相同的值。
                                                                # 也可以接收一个字典{列名:值}这样
                                                                # 比如age,可以计算除了null之外的平均值在填充age的缺失值
                                                                # means = df_miss_no_income.agg(*[F.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict('records')[0]
                                                                # 案列求解均值
                                                                df_miss_no_income.agg(*[F.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).show()
                                                                # +---+------------------+-----------------+----+
                                                                # | id|            weight|           height| age|
                                                                # +---+------------------+-----------------+----+
                                                                # |4.0|140.28333333333333|5.471428571428571|40.4|
                                                                # +---+------------------+-----------------+----+
                                                                # 按照数值型的数据求解的均值按列填入缺失值
                                                                print(df_miss_no_income.agg(*[F.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas())
                                                                #    id      weight    height   age
                                                                # 0  4.0  140.283333  5.471429  40.4
                                                                means = df_miss_no_income.agg(
                                                                    *[F.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict(orient="records")[
                                                                    0]
                                                                print(means)  # [{'id': 4.0, 'weight': 140.283333, 'height': 5.471429, 'age': 40.4}]
                                                                # 对于gender需要填入mising
                                                                # {'id': 4.0, 'weight': 140.28333333333333, 'height': 5.471428571428571, 'age': 40.4, 'gender': 'missing'}
                                                                means["gender"] = "missing"
                                                                print(means)
                                                                # `value` must be a mapping from column name (string)
                                                                df_miss_no_income.fillna(means).show()
                                                                
                                                                # +---+------------------+------+---+-------+
                                                                # | id|            weight|height|age| gender|
                                                                # +---+------------------+------+---+-------+
                                                                # |  1|             143.5|   5.6| 28|      M|
                                                                # |  2|             167.2|   5.4| 45|      M|
                                                                # |  3|140.28333333333333|   5.2| 40|missing|
                                                                # |  4|             144.5|   5.9| 33|      M|
                                                                # |  5|             133.2|   5.7| 54|      F|
                                                                # |  6|             124.1|   5.2| 40|      F|
                                                                # |  7|             129.2|   5.3| 42|      M|
                                                                # +---+------------------+------+---+-------+
                                                                
                                                                
                                                            

                                                            9. 挑选花案例

                                                            • 1-启动sparkSession
                                                            • 2-读取数据源
                                                            • 3-转化为df
                                                            • 4-执行花式查询
                                                              # -*- coding: utf-8 -*-
                                                              # Program function:
                                                              '''
                                                              1-如果数据文件没有header,直接使用sc.textFile读取
                                                                  1-1 row对象
                                                                  1-2 structType方法
                                                              2-如果数据文件有header,使用spark.read.format("csv").option("sep")
                                                              '''
                                                              from pyspark.sql import SparkSession
                                                              from pyspark import SparkConf
                                                              if __name__ == '__main__':
                                                                  # 1-启动sparksession
                                                                  conf = SparkConf().setAppName("testIris").setMaster("local[*]")
                                                                  spark = SparkSession.builder.config(conf=conf).getOrCreate()
                                                                  sc = spark.sparkContext
                                                                  sc.setLogLevel("WARN")
                                                                  # 2-读取数据源
                                                                  file_rdd = sc.textFile("file:///export/data/spark_practice/PySpark-SparkSQL_3.1.2/data/sql/iris.data")
                                                                  print("数量:",file_rdd.count())
                                                                  # 数量: 150
                                                                  # 3- 转化为df
                                                                  from pyspark.sql.types import Row
                                                                  irisDF = file_rdd \
                                                                  .filter((lambda line: len(line.strip()>0))  and (lambda line : len(line.strip().split(","))==5) ) \
                                                                  .map(lambda line: line.strip().split(",")) \
                                                                  .map(lambda x : Row(sepal_length = float(x[0]),sepal_width=float(x[1]), petal_length=float(x[2]),
                                                                                         petal_width=float(x[3]), irisclass=x[4])) \
                                                                  .toDF()
                                                                  # 4-执行花式查询操作
                                                                  irisDF.printSchema()
                                                                  irisDF.show()
                                                                  # root
                                                                  #  |-- sepal_length: double (nullable = true)
                                                                  #  |-- sepal_width: double (nullable = true)
                                                                  #  |-- petal_length: double (nullable = true)
                                                                  #  |-- petal_width: double (nullable = true)
                                                                  #  |-- irisclass: string (nullable = true)
                                                                  # 
                                                                  # +------------+-----------+------------+-----------+-----------+
                                                                  # |sepal_length|sepal_width|petal_length|petal_width|  irisclass|
                                                                  # +------------+-----------+------------+-----------+-----------+
                                                                  # |         5.1|        3.5|         1.4|        0.2|Iris-setosa|
                                                                  # |         4.9|        3.0|         1.4|        0.2|Iris-setosa|
                                                                  # |         4.7|        3.2|         1.3|        0.2|Iris-setosa|
                                                                  # |         4.6|        3.1|         1.5|        0.2|Iris-setosa|
                                                                  # |         5.0|        3.6|         1.4|        0.2|Iris-setosa|
                                                                  # |         5.4|        3.9|         1.7|        0.4|Iris-setosa|
                                                                  # |         4.6|        3.4|         1.4|        0.3|Iris-setosa|
                                                                  # |         5.0|        3.4|         1.5|        0.2|Iris-setosa|
                                                                  # |         4.4|        2.9|         1.4|        0.2|Iris-setosa|
                                                                  # |         4.9|        3.1|         1.5|        0.1|Iris-setosa|
                                                                  # |         5.4|        3.7|         1.5|        0.2|Iris-setosa|
                                                                  # |         4.8|        3.4|         1.6|        0.2|Iris-setosa|
                                                                  # |         4.8|        3.0|         1.4|        0.1|Iris-setosa|
                                                                  # |         4.3|        3.0|         1.1|        0.1|Iris-setosa|
                                                                  # |         5.8|        4.0|         1.2|        0.2|Iris-setosa|
                                                                  # |         5.7|        4.4|         1.5|        0.4|Iris-setosa|
                                                                  # |         5.4|        3.9|         1.3|        0.4|Iris-setosa|
                                                                  # |         5.1|        3.5|         1.4|        0.3|Iris-setosa|
                                                                  # |         5.7|        3.8|         1.7|        0.3|Iris-setosa|
                                                                  # |         5.1|        3.8|         1.5|        0.3|Iris-setosa|
                                                                  # +------------+-----------+------------+-----------+-----------+
                                                                  # only showing top 20 rows
                                                                  irisDF.select("sepal_length").show()
                                                                  irisDF.select(irisDF.sepal_length).show()
                                                                  irisDF.select("sepal_length", "sepal_width").show()
                                                                  
                                                                  # +------------+
                                                                  # |sepal_length|
                                                                  # +------------+
                                                                  # |         5.1|
                                                                  # |         4.9|
                                                                  # |         4.7|
                                                                  # |         4.6|
                                                                  # |         5.0|
                                                                  # |         5.4|
                                                                  # |         4.6|
                                                                  # |         5.0|
                                                                  # |         4.4|
                                                                  # |         4.9|
                                                                  # |         5.4|
                                                                  # |         4.8|
                                                                  # |         4.8|
                                                                  # |         4.3|
                                                                  # |         5.8|
                                                                  # |         5.7|
                                                                  # |         5.4|
                                                                  # |         5.1|
                                                                  # |         5.7|
                                                                  # |         5.1|
                                                                  # +------------+
                                                                  # only showing top 20 rows
                                                                  # 
                                                                  # +------------+
                                                                  # |sepal_length|
                                                                  # +------------+
                                                                  # |         5.1|
                                                                  # |         4.9|
                                                                  # |         4.7|
                                                                  # |         4.6|
                                                                  # |         5.0|
                                                                  # |         5.4|
                                                                  # |         4.6|
                                                                  # |         5.0|
                                                                  # |         4.4|
                                                                  # |         4.9|
                                                                  # |         5.4|
                                                                  # |         4.8|
                                                                  # |         4.8|
                                                                  # |         4.3|
                                                                  # |         5.8|
                                                                  # |         5.7|
                                                                  # |         5.4|
                                                                  # |         5.1|
                                                                  # |         5.7|
                                                                  # |         5.1|
                                                                  # +------------+
                                                                  # only showing top 20 rows
                                                                  # 
                                                                  # +------------+-----------+
                                                                  # |sepal_length|sepal_width|
                                                                  # +------------+-----------+
                                                                  # |         5.1|        3.5|
                                                                  # |         4.9|        3.0|
                                                                  # |         4.7|        3.2|
                                                                  # |         4.6|        3.1|
                                                                  # |         5.0|        3.6|
                                                                  # |         5.4|        3.9|
                                                                  # |         4.6|        3.4|
                                                                  # |         5.0|        3.4|
                                                                  # |         4.4|        2.9|
                                                                  # |         4.9|        3.1|
                                                                  # |         5.4|        3.7|
                                                                  # |         4.8|        3.4|
                                                                  # |         4.8|        3.0|
                                                                  # |         4.3|        3.0|
                                                                  # |         5.8|        4.0|
                                                                  # |         5.7|        4.4|
                                                                  # |         5.4|        3.9|
                                                                  # |         5.1|        3.5|
                                                                  # |         5.7|        3.8|
                                                                  # |         5.1|        3.8|
                                                                  # +------------+-----------+
                                                                  # only showing top 20 rows
                                                                  
                                                                  # 基于统计操作
                                                                  irisDF.groupby("irisclass").count().show()
                                                                  
                                                                  # +---------------+-----+
                                                                  # |      irisclass|count|
                                                                  # +---------------+-----+
                                                                  # | Iris-virginica|   50|
                                                                  # |    Iris-setosa|   50|
                                                                  # |Iris-versicolor|   50|
                                                                  # +---------------+-----+
                                                                  import pyspark.sql.functions as F
                                                                  irisDF.groupby("irisclass").agg(F.count(F.col("irisclass")).alias("variable")).show()
                                                                  
                                                                  # +---------------+--------+
                                                                  # |      irisclass|variable|
                                                                  # +---------------+--------+
                                                                  # | Iris-virginica|      50|
                                                                  # |    Iris-setosa|      50|
                                                                  # |Iris-versicolor|      50|
                                                                  # +---------------+--------+
                                                                  
                                                                  # SQL风格
                                                                  irisDF.createOrReplaceTempView("table_view")
                                                                  spark.sql("""
                                                                          select irisclass,count(1) as count
                                                                          from table_view
                                                                          group by irisclass
                                                                      """).show()
                                                                  
                                                                  # +---------------+-----+
                                                                  # |      irisclass|count|
                                                                  # +---------------+-----+
                                                                  # | Iris-virginica|   50|
                                                                  # |    Iris-setosa|   50|
                                                                  # |Iris-versicolor|   50|
                                                                  # +---------------+-----+
                                                                  
                                                              

                                                               
打赏
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《Spark重温笔记(四):秒级处理庞大数据量的 SparkSQL 操作大全,能否成为你的工作备忘指南?》
文章链接:https://goodmancom.com/wl/175927.html