欢迎光临
Spark重温笔记(二):快如闪电的大数据计算框架——你真的了解SparkCore的 RDD 吗?(包含企业级搜狗案例和网站点击案例)
   

Spark重温笔记(二):快如闪电的大数据计算框架——你真的了解SparkCore的 RDD 吗?(包含企业级搜狗案例和网站点击案例)

Spark学习笔记

前言:今天是温习 Spark 的第 2 天啦!主要梳理了 Spark 核心数据结构:RDD(弹性分布式数据集),其中包括基于内存计算的 SparkCore 各类技术知识点希望对大家有帮助!

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


文章目录

  • Spark学习笔记
      • 4. RDD简介
        • (1) RDD五大属性总结
        • (3)读取外部数据的两种方式
        • (4) Transformer算子
        • (5)Action算子
        • (6)重要函数
        • (7) WordCount的处理方法
        • (8) CombineByKey的两种方法
        • (9) join操作
        • (10)accumulator累加器
        • (11)广播变量broadcast
        • (12) 搜狗案例
        • (13)网站点击案例
        • (14)网站点击案例

          4. RDD简介

          (本节的所有数据集放在我的资源下载区哦,感兴趣的小伙伴可以自行下载:最全面的SparkCore系列案例数据集)

          (1) RDD五大属性总结
          • 1-分区列表:RDD是由一些列分区组成的
          • 2-计算函数
          • 3-依赖关系:比如reduceByKey依赖于map依赖于flatMap
          • 4-key-value的分区器:默认分区是hash分区,可以变更为range分区等
          • 5-位置优先性: 按照"移动数据不如移动计算"的理念,Spark在进行任务调度的时候,会尽可能选择那些存有数据的worker节点来进行任务计算。(数据本地性)
            #### (2) RDD的2种创建方法
            • 1-使用并行化集合,本质上就是将本地集合作为参数传递到sc.pa
            • 2-使用sc.textFile方式读取外部文件系统,包括hdfs和本地文件系统
              _01_createRDD.py
              # -*- coding: utf-8 -*-
              # Program function:创建RDD的两种方式
              '''
              第一种方式:使用并行化集合,本质上就是将本地集合作为参数传递到sc.pa
              第二种方式:使用sc.textFile方式读取外部文件系统,包括hdfs和本地文件系统
              1-准备SparkContext的入口,申请资源
              2-使用rdd创建的第一种方法
              3-使用rdd创建的第二种方法
              4-关闭SparkContext
              '''
              from pyspark import SparkContext,SparkConf
              if __name__ == "__main__":
                  print("===================createRDD==========================")
                  # 1. 准备spark上下文环境
                  conf = SparkConf().setAppName("CreateRDD").setMaster("local[5]")
                  sc = SparkContext(conf=conf)
                  sc.setLogLevel("WARN")
                  # 2.使用rdd创建的第一种方法
                  connection_rdd = sc.parallelize([1,2,3,4,5])
                  print(connection_rdd.collect())
                  # 3.如何使用api获取rdd的分区个数
                  print(connection_rdd.getNumPartitions())
                  # 4.使用rdd创建的第二种方法
                  file_rdd = sc.textFile("D:\PythonProject\Bigdata_Pyspark3.1.2\PySpark-SparkCore_3.1.2\data\words.txt")
                  print(file_rdd.collect())
                  print(file_rdd.getNumPartitions())
                  # 5. 关闭SparkContext
                  sc.stop()
              

              (3)读取外部数据的两种方式
              • 1-sc.textFile:读取外部文件系统,包括hdfs和本地文件系统,可以分区
              • 2-sc.wholeTextFiles:可以读取文件夹下面的所有小文件,可以分区
                • 读取分区数据:file_rdd.glom().collect()
                • 不排序,获取第一个数据:take(1)
                • 默认降序排序,获取第一个数据:top(1)
                • 默认升序排序,获取第一个数据:takeOrdered(1)
                • 3-并行度分区总结
                  • 0.local[]不指定时,那么默认就是 4,且 minparallelism 最小值是 2
                  • 1.setMaster(local[5]),如果不额外指定,那么getNumPartitions 就是5
                  • 2.setMaster(local[5],而parallelize([], 3)),那么getNumPartitions 就是3
                  • 3.wholeTextFile(,3), 那么getNumPartitions 就是 3
                  • 4.如果sc.textFile读取的是文件夹中多个文件,这里的分区个数是以文件个数为主的,自己写的分区不起作用
                    _02_createWholeTextFile.py
                    # -*- coding: utf-8 -*-
                    # Program function:创建RDD的两种方式
                    '''
                    第一种方式:使用并行化集合,本质上就是将本地集合作为参数传递到sc.pa
                    第二种方式:使用sc.textFile方式读取外部文件系统,包括hdfs和本地文件系统
                    1-准备SparkContext的入口,申请资源
                    2-使用rdd创建的第一种方法
                    3-使用rdd创建的第二种方法
                    4-关闭SparkContext
                    '''
                    from pyspark import SparkConf, SparkContext
                    if __name__ == '__main__':
                        print("=========createRDD==============")
                        # 1 - 准备SparkContext的入口,申请资源
                        conf = SparkConf().setAppName("createRDD").setMaster("spark://node1:7077")
                        sc = SparkContext(conf=conf)
                        # 3 - 使用rdd创建的第二种方法
                        # minPartitions最小的分区个数,最终有多少的分区个数,以实际打印为主
                        file_rdd = sc.parallelize([1, 2, 3, 4, 5], 6)  # 分区数 > 元素 ,会有一个分区没有数据
                        print("rdd numpatitions:{}".format(file_rdd.getNumPartitions()))
                        ## 如何打印每个分区的内容
                        print("per partition content:", file_rdd.glom().collect())
                        # wholeTextFiles 读取小文件
                        file_rdd = sc.wholeTextFiles("hdfs://node1:9820/pydata/input/rdddata/ratings100", 3)
                        print("rdd numpartitions:{}".format(file_rdd.getNumPartitions()))
                        # print(" file_rdd per partition content:",file_rdd.glom().collect())   # 映射
                        # 如果sc.textFile读取的是文件夹中多个文件,这里的分区个数是以文件个数为主的,自己写的分区不起作用
                        # file_rdd = sc.textFile("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/ratings100", 3)
                        file_rdd = sc.textFile("hdfs://node1:9820/pydata/input/rdddata/ratings100")
                        print("rdd numpartitions:{}".format(file_rdd.getNumPartitions()))
                        print(file_rdd.take(1))  # take用于获取RDD中从0到num-1下标的元素,不排序。
                        print(file_rdd.top(1))  # top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。
                        print(file_rdd.takeOrdered(1))  # 与top相反,取了最小值,默认升序
                        # 4 - 关闭SparkContext
                        print("成功!")
                        sc.stop()
                    	
                    

                    (4) Transformer算子
                    • 1- 单value类型
                      • map操作
                      • filter操作
                      • flatmap操作
                      • groupBy操作
                      • mapValue操作
                        _01_singleValueOperation
                        # -*- coding: utf-8 -*-
                        # Program function:完成单Value类型RDD的转换算子的演示
                        from pyspark import SparkContext, SparkConf
                        import re
                        '''
                        分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
                        分区间:有一些操作分区间做一些累加
                        '''
                        if __name__ == "__main__":
                            # 1-创建SparkContext申请资源
                            conf = SparkConf().setAppName("mini").setMaster("local[*]")
                            sc = SparkContext(conf=conf)
                            sc.setLogLevel("WARN")
                            # 2-map操作
                            print("==========================map操作=================================")
                            rdd1 = sc.parallelize([1, 2, 3, 4, 5])
                            print("rdd1 numpartitions:{}".format(rdd1.getNumPartitions()))
                            rdd_map = rdd1.map(lambda x: x * 2)
                            print("rdd_map numpartitions:{}".format(rdd_map.getNumPartitions()))
                            print(rdd_map.glom().collect())
                            # 3-filter操作
                            print("==========================filter操作=================================")
                            print(rdd1.collect())
                            print(rdd1.glom().collect())
                            print(rdd1.filter(lambda x: x > 3).glom().collect())
                            # 4-flatmap操作
                            print("==========================flatmap操作=================================")
                            rdd2 = sc.parallelize(["  hello      you", "hello me  "])
                            # 匹配一个或多个空白字符,包括空格、制表符和换行符
                            print(rdd2.flatMap(lambda word: re.split("\s+", word.strip())).collect())
                            # 5-groupBy操作
                            print("==========================groupBy操作=================================")
                            x = sc.parallelize([1, 2, 3])
                            y = x.groupBy(lambda x: 'A' if (x % 2 == 1) else 'B')
                            print(y.mapValues(list).collect())
                            # 6-mapValue操作
                            print("==========================mapValue操作=================================")
                            x1 = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
                            def f(x):
                                return len(x)
                            print(x1.mapValues(f).collect())
                        
                        ==========================map操作=================================
                        rdd1 numpartitions:4
                        rdd_map numpartitions:4
                        [[2], [4], [6], [8, 10]]
                        ==========================filter操作=================================
                        [1, 2, 3, 4, 5]
                        [[1], [2], [3], [4, 5]]
                        [[], [], [], [4, 5]]
                        ==========================flatmap操作=================================
                        ['hello', 'you', 'hello', 'me']
                        ==========================groupBy操作=================================
                        [('A', [1, 3]), ('B', [2])]
                        ==========================mapValue操作=================================
                        [('a', 3), ('b', 1)]
                        
                        • 2-双value类型
                          • 并集:A.union(B)
                          • 交集:A.intersection(B)
                          • 差集:大.subtract(小)
                          • 去重:distinct()
                            _02_doubleValuesOperation.py
                            # -*- coding: utf-8 -*-
                            # Program function:完成单Value类型RDD的转换算子的演示
                            from pyspark import SparkConf, SparkContext
                            import re
                            '''
                            分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
                            分区间:有一些操作分区间做一些累加
                            '''
                            if __name__ == '__main__':
                                # 1-创建SparkContext申请资源
                                conf = SparkConf().setAppName("mini2").setMaster("local[*]")
                                sc = SparkContext.getOrCreate(conf=conf)
                                sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
                                # 2-对两个RDD求并集
                                print("========================并集=============================")
                                rdd1 = sc.parallelize([1, 2, 3, 4, 5])
                                rdd2 = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])
                                Union_RDD = rdd1.union(rdd2)
                                print(Union_RDD.collect())
                                # 3-对两个rdd求交集
                                print("========================交集=============================")
                                print(rdd1.intersection(rdd2).collect())
                                # 4-对两个rdd求差集
                                print("========================差集=============================")
                                print(rdd2.subtract(rdd1).collect())
                                # Return a new RDD containing the distinct elements in this RDD.
                                print(Union_RDD.distinct().collect())
                                print(Union_RDD.distinct().glom().collect())
                            
                            ========================并集=============================
                            [1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 7, 8]
                            ========================交集=============================
                            [1, 2, 3, 4, 5]
                            ========================差集=============================
                            [8, 6, 7]
                            [8, 1, 2, 3, 4, 5, 6, 7]
                            [[8], [1], [2], [3], [4], [5], [6], [7]]        ================每个rdd四个分区,两个rdd八个分区
                            
                            • 3-key-Value算子
                              • groupByKey:按照key进行分组有地址,用mapValue获取值
                              • reduceByKey:键一起,值聚合
                              • sortByKey:按照键排序
                              • countByKey:计数
                              • combineByKey是底层API
                              • foldBykey
                              • aggreateBykey
                                _03_keyValuesOperation.py
                                # -*- coding: utf-8 -*-
                                # Program function:完成单Value类型RDD的转换算子的演示
                                from pyspark import SparkConf, SparkContext
                                import re
                                '''
                                分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
                                分区间:有一些操作分区间做一些累加
                                '''
                                if __name__ == '__main__':
                                    # 1-创建SparkContext申请资源
                                    conf = SparkConf().setAppName("mini2").setMaster("local[*]")
                                    sc = SparkContext.getOrCreate(conf=conf)
                                    sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
                                    # 2-key和value类型算子
                                    # groupByKey
                                    rdd1 = sc.parallelize([("a", 1), ("b", 2)])
                                    rdd2 = sc.parallelize([("c", 1), ("b", 3)])
                                    rdd3 = rdd1.union(rdd2)
                                    key1 = rdd3.groupByKey()
                                    print("=============================groupByKey====================================")
                                    print("groupByKey:", key1.collect())
                                    print(key1.mapValues(list).collect())  # 需要通过mapValue获取groupByKey的值
                                    print(key1.mapValues(tuple).collect())
                                    print("=============================reduceByKey====================================")
                                    # reduceByKey
                                    key2 = rdd3.reduceByKey(lambda x, y: x + y)
                                    print(key2.collect())
                                    print("=============================sortByKey====================================")
                                    # sortByKey
                                    print(key2.map(lambda x: (x[1], x[0])).sortByKey(False).collect())  
                                    # [(5, 'b'), (1, 'c'), (1, 'a')]
                                    print("=============================countByKey====================================")
                                    # countByKey
                                    print(rdd3.countByKey())
                                
                                =============================groupByKey====================================
                                groupByKey: [
                                    ('a', ), 
                                    ('b', ), 
                                    ('c', )]
                                [('a', [1]), ('b', [2, 3]), ('c', [1])]
                                [('a', (1,)), ('b', (2, 3)), ('c', (1,))]
                                =============================reduceByKey====================================
                                [('a', 1), ('b', 5), ('c', 1)]
                                =============================sortByKey====================================
                                [(5, 'b'), (1, 'a'), (1, 'c')]
                                =============================countByKey====================================
                                defaultdict(, {'a': 1, 'b': 2, 'c': 1})
                                  
                                

                                (5)Action算子
                                • 1-主要算子
                                  • first():第一个元素
                                  • reduce(add):元素累加
                                  • reduce(mul):累乘
                                  • takeSample:随机数
                                    _04_actionOperation.py
                                    # -*- coding: utf-8 -*-
                                    # Program function:完成单Value类型RDD的转换算子的演示
                                    from pyspark import SparkConf, SparkContext
                                    import re
                                    '''
                                    分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
                                    分区间:有一些操作分区间做一些累加
                                    '''
                                    if __name__ == '__main__':
                                        # 1-创建SparkContext申请资源
                                        conf = SparkConf().setAppName("mini2").setMaster("local[*]")
                                        sc = SparkContext.getOrCreate(conf=conf)
                                        sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
                                        # 2-key和value类型算子
                                        # groupByKey
                                        rdd1 = sc.parallelize([("a", 1), ("b", 2)])
                                        rdd2 = sc.parallelize([("c", 1), ("b", 3)])
                                        print(rdd1.first())
                                        print(rdd1.take(2))
                                        print(rdd1.top(2))
                                        print(rdd1.collect())
                                        rdd3 = sc.parallelize([1, 2, 3, 4, 5])
                                        from operator import add
                                        from operator import mul
                                        print(rdd3.reduce(add))
                                        print(rdd3.reduce(mul))
                                        rdd4 = sc.parallelize(range(0, 10))
                                        # 能否保证每次抽样结果是一致的,使用seed随机数种子
                                        print(rdd4.takeSample(True, 3, 123))
                                        print(rdd4.takeSample(True, 3, 123))
                                        print(rdd4.takeSample(True, 3, 123))
                                        print(rdd4.takeSample(True, 3, 34))
                                    
                                    ('a', 1)
                                    [('a', 1), ('b', 2)]
                                    [('b', 2), ('a', 1)]
                                    [('a', 1), ('b', 2)]
                                    15
                                    120
                                    [4, 7, 2]
                                    [4, 7, 2]
                                    [4, 7, 2]
                                    [9, 9, 4]
                                    
                                    • 2-其他算子
                                      • foreach:Applies a function to all elements of this RDD.
                                      • foreachPartition:Applies a function to each partition of this RDD.
                                      • map:按照元素进行转换
                                      • mapPartiton:按照分区进行转换
                                        _05_otherOperation.py
                                        # -*- coding: utf-8 -*-
                                        # Program function:完成单Value类型RDD的转换算子的演示
                                        from pyspark import SparkConf, SparkContext
                                        import re
                                        '''
                                        分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
                                        分区间:有一些操作分区间做一些累加
                                        '''
                                        def f(iterator):  # 【1,2,3】 【4,5】
                                            for x in iterator:  # for x in 【1,2,3】  x=1,2,3 print 1.2.3
                                                print(x)
                                        def f1(iterator):  # 【1,2,3】 【4,5】  sum(1+2+3) sum(4+5)
                                            yield sum(iterator)
                                        if __name__ == '__main__':
                                            # 1-创建SparkContext申请资源
                                            conf = SparkConf().setAppName("mini2").setMaster("local[*]")
                                            sc = SparkContext.getOrCreate(conf=conf)
                                            sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
                                            # 2-foreach-Applies a function to all elements of this RDD.
                                            rdd1 = sc.parallelize([("a", 1), ("b", 2)])
                                            print(rdd1.glom().collect())
                                            # def f(x):print(x)
                                            rdd1.foreach(lambda x: print(x))
                                            # 3-foreachPartition--Applies a function to each partition of this RDD.
                                            # 从性能角度分析,按照分区并行比元素更加高效
                                            rdd1.foreachPartition(f)
                                            # 4-map---按照元素进行转换
                                            rdd2 = sc.parallelize([1, 2, 3, 4])
                                            print(rdd2.map(lambda x: x * 2).collect())
                                            # 5-mapPartiton-----按照分区进行转换
                                            # Return a new RDD by applying a function to each partition of this RDD.
                                            print(rdd2.mapPartitions(f1).collect())  # [3, 7]
                                        
                                        [[], [('a', 1)], [], [('b', 2)]]
                                        ('a', 1)
                                        ('b', 2)
                                        ('b', 2)
                                        ('a', 1)
                                        [2, 4, 6, 8]
                                        [1, 2, 3, 4]
                                          
                                        

                                        (6)重要函数
                                        • 1-重分区函数

                                          repartition:默认调用的是coalese的shuffle为True的方法(会产生分区数据为空)

                                          coalese:减少分区函数

                                          PartitonBy:可以调整分区,还可以调整分区器(一种hash分区器(一般打散数据)

                                          _06_repartitionOperation.py
                                          # -*- coding: utf-8 -*-
                                          # Program function:完成单Value类型RDD的转换算子的演示
                                          from pyspark import SparkConf, SparkContext
                                          import re
                                          '''
                                          分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
                                          分区间:有一些操作分区间做一些累加
                                          alt+6 可以调出来所有TODO,
                                          TODO是Python提供了预留功能的地方
                                          '''
                                          if __name__ == '__main__':
                                              # TODO:  1-创建SparkContext申请资源
                                              conf = SparkConf().setAppName("mini2").setMaster("local[*]")
                                              sc = SparkContext.getOrCreate(conf=conf)
                                              sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
                                              # TODO:   2-执行重分区函数--repartition
                                              rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
                                              print("partitions num:", rdd1.getNumPartitions())
                                              print(rdd1.glom().collect())  # [[1, 2], [3, 4], [5, 6]]
                                              print("repartition result:")
                                              # TODO:   repartition可以增加分区也可以减少分区,但是都会产生shuflle,如果减少分区的化建议使用coalesc避免发生shuffle
                                              rdd__repartition1 = rdd1.repartition(5)
                                              print("increase partition", rdd__repartition1.glom().collect())  # [[], [1, 2], [5, 6], [3, 4], []]
                                              rdd__repartition2 = rdd1.repartition(2)
                                              print("decrease partition", rdd__repartition2.glom().collect())  # decrease partition [[1, 2, 5, 6], [3, 4]]
                                              # TODO:   3-减少分区--coalese
                                              print(rdd1.coalesce(2).glom().collect())  # [[1, 2], [3, 4, 5, 6]]
                                              print(rdd1.coalesce(5).glom().collect())  # [[1, 2], [3, 4], [5, 6]]
                                              print(rdd1.coalesce(5, True).glom().collect())  # [[], [1, 2], [5, 6], [3, 4], []]
                                              # 结论:repartition默认调用的是coalese的shuffle为True的方法
                                              # TODO:  4-PartitonBy,可以调整分区,还可以调整分区器(一种hash分区器(一般打散数据),一种range分区器(排序拍好的))
                                              # 此类专门针对RDD中数据类型为KeyValue对提供函数
                                              # rdd五大特性中有第四个特点key-value分区器,默认是hashpartitioner分区器
                                              rdd__map = rdd1.map(lambda x: (x, x))
                                              print("partitions length:", rdd__map.getNumPartitions())  # partitions length: 3
                                              print(rdd__map.partitionBy(2).glom().collect())
                                          
                                          ssh://root@192.168.52.3:22/root/anaconda3/envs/pyspark_env/bin/python3 -u /export/data/spark_practice/PySpark-SparkCore_3.1.2/main/rddOperation/_06_repartitionOperation.py
                                          23/12/15 17:11:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                          Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
                                          Setting default log level to "WARN".
                                          To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                          partitions num: 3
                                          [[1, 2], [3, 4], [5, 6]]
                                          repartition result:
                                          increase partition [[], [1, 2], [5, 6], [3, 4], []]
                                          decrease partition [[1, 2, 5, 6], [3, 4]]
                                          [[1, 2], [3, 4, 5, 6]]
                                          [[1, 2], [3, 4], [5, 6]]
                                          [[], [1, 2], [5, 6], [3, 4], []]
                                          partitions length: 3
                                          [[(2, 2), (4, 4), (6, 6)], [(1, 1), (3, 3), (5, 5)]]
                                            
                                          
                                          • byKey类的聚合函数(一)
                                            • fold:聚合计算
                                            • aggregate:fold是aggregate的简化版本,fold分区内和分区间的函数是一致的
                                              _07_ByKeyOperation.py
                                              # -*- coding: utf-8 -*-
                                              # Program function:完成单Value类型RDD的转换算子的演示
                                              from pyspark import SparkConf, SparkContext
                                              import re
                                              '''
                                              分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
                                              分区间:有一些操作分区间做一些累加
                                              alt+6 可以调出来所有TODO,
                                              TODO是Python提供了预留功能的地方
                                              '''
                                              def addNum(x,y):
                                                  return x+y
                                              if __name__ == '__main__':
                                                  # TODO:  1-创建SparkContext申请资源
                                                  conf = SparkConf().setAppName("mini2").setMaster("local[*]")
                                                  sc = SparkContext.getOrCreate(conf=conf)
                                                  sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
                                                  # TODO:   2-使用reduce进行聚合计算
                                                  rdd1 = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
                                                  from operator import add
                                                  # 直接得到返回值-21
                                                  print(rdd1.reduce(add))
                                                  # TODO: 3-使用fold进行聚合计算
                                                  # 第一个参数zeroValue是初始值,会参与分区的计算
                                                  # 第二个参数是执行运算的operation
                                                  print(rdd1.fold(0, add))  # 21
                                                  print(rdd1.getNumPartitions())  # 3
                                                  print(rdd1.glom().collect())
                                                  print("fold result:", rdd1.fold(10, add))
                                                  # TODO: 3-使用aggreate进行聚合计算
                                                  # seqOp分区内的操作, combOp分区间的操作
                                                  print(rdd1.aggregate(0, add, add))  # 21
                                                  print(rdd1.glom().collect())
                                                  print("aggregate result:", rdd1.aggregate(1, add, add))  # aggregate result: 25
                                                  # 结论:fold是aggregate的简化版本,fold分区内和分区间的函数是一致的
                                                  print("aggregate result:", rdd1.aggregate(1, addNum, addNum))  # aggregate result: 25
                                              
                                              21
                                              21
                                              3
                                              [[1, 2], [3, 4], [5, 6]]
                                              fold result: 61
                                              21
                                              [[1, 2], [3, 4], [5, 6]]
                                              aggregate result: 25
                                              aggregate result: 25
                                                
                                              
                                              • byKey类的聚合函数(二)
                                                • groupByKey:会返回一个地址信息,需要mapValues(list)/(sum)取值
                                                • reduceByKey:聚合操作
                                                • foldByKey:有初始值,函数
                                                • aggregateByKey:有初始值,seqOp分区内的操作,combOp分区间的操作
                                                  _08_ByKeyOperation.py
                                                  # -*- coding: utf-8 -*-
                                                  # Program function:完成单Value类型RDD的转换算子的演示
                                                  from pyspark import SparkConf, SparkContext
                                                  import re
                                                  '''
                                                  分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
                                                  分区间:有一些操作分区间做一些累加
                                                  alt+6 可以调出来所有TODO,
                                                  TODO是Python提供了预留功能的地方
                                                  '''
                                                  def addNum(x, y):
                                                      return x + y
                                                  if __name__ == '__main__':
                                                      # TODO:  1-创建SparkContext申请资源
                                                      conf = SparkConf().setAppName("mini2").setMaster("local[*]")
                                                      sc = SparkContext.getOrCreate(conf=conf)
                                                      sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
                                                      # TODO:   2-使用groupByKey进行聚合计算
                                                      from operator import add
                                                      rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
                                                      # groupByKey官网建议如果操作聚合功能建议使用reduceBykey和combineByKey
                                                      group_by_key_rdd = rdd.groupByKey()
                                                      print(group_by_key_rdd.collect())  # [('b', ),
                                                      print("groupBy value is:",
                                                            sorted(group_by_key_rdd.mapValues(list).collect()))  # groupBy value is: [('a', [1, 1]), ('b', [1])]
                                                      # 如何实现基于相同key的value的累加操作,实现wordcount的操作
                                                      print("count value is:", sorted(group_by_key_rdd.mapValues(sum).collect()))  # count value is: [('a', 2), ('b', 1)]
                                                      # TODO:   2-使用reduceByKey进行聚合计算,redueByKey提前会实现预聚合功能,性能更好
                                                      reduce_by_key_rdd = rdd.reduceByKey(lambda x, y: x + y)
                                                      print(reduce_by_key_rdd.collect())
                                                      # TODO: 3-使用foldByKey算子进行聚合操作
                                                      fold_by_key_rdd = rdd.foldByKey(0, add)
                                                      print("fold_by_key_rdd:", fold_by_key_rdd.collect())
                                                      # 这里初始值给1,小测试
                                                      print(rdd.getNumPartitions())
                                                      print("rdd glom result:", rdd.glom().collect())  # rdd glom result: [[('a', 1)], [('b', 1), ('a', 1)]]
                                                      fold_by_key_rdd1 = rdd.foldByKey(1, add)
                                                      print(fold_by_key_rdd1.collect())  # [('b', 2), ('a', 4)]
                                                      # TODO: 4-使用aggreateByKey算子进行聚合操作,这里也是类似于fold和aggreate,foldByKey是简化版本的aggreateByKey,
                                                      #  foldByKey的分区内和分区间的函数是一致的
                                                      aggregate_by_key_rdd = rdd.aggregateByKey(0, add, add)
                                                      print("aggregate_by_key_rdd", aggregate_by_key_rdd.collect())  # aggregate_by_key_rdd [('b', 1), ('a', 2)]
                                                      aggregate_by_key_rdd1 = rdd.aggregateByKey(1, add, add)
                                                      print(aggregate_by_key_rdd1.collect())  # [('b', 2), ('a', 4)]
                                                  

                                                  (7) WordCount的处理方法
                                                  • 1-数据处理:filter过滤-flatMap扁平化-map映射格式
                                                  • 2-reduceByKey进行聚合计算
                                                  • 3-groupByKey进行聚合计算,记得mapValues(sum)
                                                  • 4-foldByKey算子进行聚合操作
                                                  • 5-aggreateByKey算子进行聚合操作
                                                    _09_wordcountWay.py
                                                    # -*- coding: utf-8 -*-
                                                    # Program function:完成单Value类型RDD的转换算子的演示
                                                    from pyspark import SparkConf, SparkContext
                                                    import re
                                                    '''
                                                    分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
                                                    分区间:有一些操作分区间做一些累加
                                                    alt+6 可以调出来所有TODO,
                                                    TODO是Python提供了预留功能的地方
                                                    '''
                                                    def addNum(x, y):
                                                        return x + y
                                                    if __name__ == '__main__':
                                                        # TODO:  1-创建SparkContext申请资源
                                                        conf = SparkConf().setAppName("mini2").setMaster("local[*]")
                                                        sc = SparkContext.getOrCreate(conf=conf)
                                                        sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
                                                        # TODO:  2-基础数据处理
                                                        from operator import add
                                                        file_rdd = sc.textFile("/export/data/spark_practice/PySpark-SparkCore_3.1.2/data/words.txt")
                                                        map_rdd = file_rdd \
                                                            .filter(lambda line: len(line.strip()) > 0) \
                                                            .flatMap(lambda line: re.split("\s+", line)) \
                                                            .map(lambda word: (word, 1))
                                                        # print(map_rdd.collect())
                                                        # [('hello', 1), ('you', 1), ('Spark', 1), ('Flink', 1), ('hello', 1), ('me', 1), ('hello', 1), ('she', 1), ('Spark', 1)]
                                                        # TODO:  3-使用reduceByKey进行聚合计算,redueByKey提前会实现预聚合功能,性能更好
                                                        result1 = map_rdd \
                                                            .reduceByKey(lambda x, y: x + y) \
                                                            .collect()
                                                        print("result1 reduceByKey opration is:", result1)
                                                        # TODO:  2-使用groupByKey进行聚合计算
                                                        result2 = map_rdd \
                                                            .groupByKey() \
                                                            .mapValues(sum) \
                                                            .collect()
                                                        print("result2 reduceByKey opration is:", result2)
                                                        # TODO: 4-使用foldByKey算子进行聚合操作
                                                        result3 = map_rdd \
                                                            .foldByKey(0, add) \
                                                            .collect()
                                                        print("result3 reduceByKey opration is:", result3)
                                                        # TODO: 5-使用aggreateByKey算子进行聚合操作,这里也是类似于fold和aggreate,foldByKey是简化版本的aggreateByKey,
                                                        #  foldByKey的分区内和分区间的函数是一致的
                                                        result4 = map_rdd \
                                                            .aggregateByKey(0, add, add) \
                                                            .collect()
                                                        print("result4 reduceByKey opration is:", result4)
                                                    
                                                    result1 reduceByKey opration is: [('Spark', 2), ('Flink', 1), ('hello', 3), ('you', 1), ('me', 1), ('she', 1)]
                                                    result2 reduceByKey opration is: [('Spark', 2), ('Flink', 1), ('hello', 3), ('you', 1), ('me', 1), ('she', 1)]
                                                    result3 reduceByKey opration is: [('Spark', 2), ('Flink', 1), ('hello', 3), ('you', 1), ('me', 1), ('she', 1)]
                                                    result4 reduceByKey opration is: [('Spark', 2), ('Flink', 1), ('hello', 3), ('you', 1), ('me', 1), ('she', 1)]
                                                    

                                                    (8) CombineByKey的两种方法
                                                    • 1-combineByKey(createCombiner, mergeValue, mergeCombiners)-汇聚类
                                                      _10_combineByKeyWay
                                                      # -*- coding: utf-8 -*-
                                                      # Program function:完成单Value类型RDD的转换算子的演示
                                                      from pyspark import SparkConf, SparkContext
                                                      import re
                                                      '''
                                                      分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
                                                      分区间:有一些操作分区间做一些累加
                                                      alt+6 可以调出来所有TODO,
                                                      TODO是Python提供了预留功能的地方
                                                      '''
                                                      '''
                                                      对初始值进行操作
                                                      '''
                                                      def createCombiner(value): #('a',[1])
                                                          return [value]
                                                      # 这里的x=createCombiner得到的[value]结果
                                                      def mergeValue(x,y): #这里相同a的value=y=1
                                                          x.append(y)#('a', [1, 1]),('b', [1])
                                                          return x
                                                      def mergeCombiners(a,b):
                                                          a.extend(b)
                                                          return a
                                                      if __name__ == '__main__':
                                                          # TODO:  1-创建SparkContext申请资源
                                                          conf = SparkConf().setAppName("mini2").setMaster("local[*]")
                                                          sc = SparkContext.getOrCreate(conf=conf)
                                                          sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
                                                          # TODO:  2-基础数据处理
                                                          from operator import add
                                                          rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
                                                          # [(a:[1,1]),(b,[1,1])]
                                                          print(sorted(rdd.groupByKey().mapValues(list).collect()))
                                                          # 使用自定义集聚合函数组合每个键的元素的通用功能。
                                                          # - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
                                                          # 对初始值进行操作
                                                          # - `mergeValue`, to merge a V into a C (e.g., adds it to the end ofa list)
                                                          # 对分区内的元素进行合并
                                                          # - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists)
                                                          # 对分区间的元素进行合并
                                                          by_key_result = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
                                                          print(sorted(by_key_result.collect()))#[('a', [1, 1]), ('b', [1])]
                                                      
                                                      [('a', [1, 1]), ('b', [1])]
                                                      [('a', [1, 1]), ('b', [1])]
                                                      
                                                      • 2-求平均成绩
                                                        _11_combineByKeyWay2.py
                                                        # -*- coding: utf-8 -*-
                                                        # Program function:完成单Value类型RDD的转换算子的演示
                                                        from pyspark import SparkConf, SparkContext
                                                        import re
                                                        '''
                                                        分区内:一个rdd可以分为很多分区,每个分区里面都是有大量元素,每个分区都需要线程执行
                                                        分区间:有一些操作分区间做一些累加
                                                        alt+6 可以调出来所有TODO,
                                                        TODO是Python提供了预留功能的地方
                                                        '''
                                                        '''
                                                        对初始值进行操作
                                                        [value,1],value指的是当前学生成绩,1代表的是未来算一下一个学生考了几次考试
                                                        ("Fred", 88)---------->[88,1]
                                                        '''
                                                        def createCombiner(value):  #
                                                            return [value, 1]
                                                        '''
                                                        x代表的是 [value,1]值,x=[88,1]
                                                        y代表的相同key的value,比如("Fred", 95)的95,执行分区内的累加
                                                        '''
                                                        def mergeValue(x, y):
                                                            return [x[0] + y, x[1] + 1]
                                                        '''
                                                        a = a[0] value,a[1] 几次考试
                                                        上一步结果(160,2)
                                                        这一步(91,1)
                                                        =160+91,2+1
                                                        '''
                                                        def mergeCombiners(a, b):
                                                            return [a[0] + b[0], a[1] + b[1]]
                                                        if __name__ == '__main__':
                                                            # TODO:  1-创建SparkContext申请资源
                                                            conf = SparkConf().setAppName("mini2").setMaster("local[*]")
                                                            sc = SparkContext.getOrCreate(conf=conf)
                                                            sc.setLogLevel("WARN")  # 一般在工作中不这么写,直接复制log4j文件
                                                            # TODO:  2-基础数据处理
                                                            from operator import add
                                                            # 这里需要实现需求:求解一个学生的平均成绩
                                                            x = sc.parallelize([("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), ("Wilma", 95), ("Wilma", 98)], 3)
                                                            print(x.glom().collect())
                                                            # 第一个分区("Fred", 88), ("Fred", 95)
                                                            # 第二个分区("Fred", 91), ("Wilma", 93),
                                                            # 第三个分区("Wilma", 95), ("Wilma", 98)
                                                            # reduceByKey
                                                            reduce_by_key_rdd = x.reduceByKey(lambda x, y: x + y)
                                                            print("reduceBykey:", reduce_by_key_rdd.collect())  # [('Fred', 274), ('Wilma', 286)]
                                                            # 如何求解平均成绩?
                                                            # 使用自定义集聚合函数组合每个键的元素的通用功能。
                                                            # - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
                                                            # 对初始值进行操作
                                                            # - `mergeValue`, to merge a V into a C (e.g., adds it to the end ofa list)
                                                            # 对分区内的元素进行合并
                                                            # - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists)
                                                            # 对分区间的元素进行合并
                                                            combine_by_key_rdd = x.combineByKey(createCombiner, mergeValue, mergeCombiners)
                                                            print(combine_by_key_rdd.collect())  # [('Fred', [274, 3]), ('Wilma', [286, 3])]
                                                            # 接下来平均值如何实现--('Fred', [274, 3])---x[0]=Fred x[1]= [274, 3],x[1][0]=274,x[1][1]=3
                                                            print(combine_by_key_rdd.map(lambda x: (x[0], int(x[1][0] / x[1][1]))).collect())
                                                        
                                                        [[('Fred', 88), ('Fred', 95)], [('Fred', 91), ('Wilma', 93)], [('Wilma', 95), ('Wilma', 98)]]
                                                        reduceBykey: [('Fred', 274), ('Wilma', 286)]
                                                        [('Fred', [274, 3]), ('Wilma', [286, 3])]
                                                        [('Fred', 91), ('Wilma', 95)]
                                                        

                                                        (9) join操作
                                                        • join:内连接
                                                        • leftOuterJoin:左连接
                                                        • rightOuterJoin:右连接
                                                          _12_joinOpratition.py
                                                          # -*- coding: utf-8 -*-
                                                          # Program function:演示join操作
                                                          from pyspark import SparkConf, SparkContext
                                                          from pyspark.storagelevel import StorageLevel
                                                          import time
                                                          if __name__ == '__main__':
                                                              print('PySpark join Function Program')
                                                              # TODO:1、创建应用程序入口SparkContext实例对象
                                                              conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
                                                              sc = SparkContext.getOrCreate(conf)
                                                              sc.setCheckpointDir("/export/data/pyspark_workspace/PySpark-SparkCore_3.1.2/data/checkpoint")
                                                              # TODO: 2、从本地文件系统创建RDD数据集
                                                              x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
                                                              y = sc.parallelize([(1001, "sales"), (1002, "tech")])
                                                              # TODO:3、使用join完成联合操作
                                                              join_result_rdd = x.join(y)
                                                              print(join_result_rdd.collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
                                                              print(x.leftOuterJoin(y).collect())
                                                              print(x.rightOuterJoin(y).collect())  # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
                                                              # 缓存--基于内存缓存-cache底层调用的是self.persist(StorageLevel.MEMORY_ONLY)
                                                              join_result_rdd.cache()
                                                              # join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
                                                              # 如果执行了缓存的操作,需要使用action算子触发,在4040页面上看到绿颜色标识
                                                              join_result_rdd.collect()
                                                              # 如果后续执行任何的操作会直接基于上述缓存的数据执行,比如count
                                                              print(join_result_rdd.count())
                                                              join_result_rdd.unpersist()
                                                              print("释放缓存之后,直接从rdd的依赖链重新读取")
                                                              join_result_rdd.checkpoint()
                                                              join_result_rdd.count()
                                                              # time.sleep(600)
                                                              sc.stop()
                                                          
                                                          [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
                                                          [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech')), (1003, ('wangwu', None)), (1004, ('zhangliu', None))]
                                                          [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
                                                          2
                                                          释放缓存之后,直接从rdd的依赖链重新读取
                                                          

                                                          (10)accumulator累加器
                                                          _13_accumulate.py
                                                          # -*- coding: utf-8 -*-
                                                          # Program function:测试累加器
                                                          # 测试1:python集合的累加器,作为单机版本没有问题
                                                          # 测试2:spark的rdd的集合,直接进行累加操作,不会触发结果到driver的
                                                          # 原因;Driver端定义的变量,在executor执行完毕后没有将结果传递到driver
                                                          # 引出共享变量,Driver和Executor共享变量的改变
                                                          from pyspark import SparkContext, SparkConf
                                                          if __name__ == '__main__':
                                                              conf = SparkConf().setAppName("miniPy").setMaster("local[*]")
                                                              sc = SparkContext(conf=conf)
                                                              # 下面是rdd的集合
                                                              l1 = [1, 2, 3, 4, 5]
                                                              l1_textFile = sc.parallelize(l1)
                                                              # 定义累加器
                                                              acc_num = sc.accumulator(10)
                                                              print(type(acc_num))
                                                              # 执行函数
                                                              def add_num(x):
                                                                  global acc_num
                                                                  # acc_num+=x,这里了累加器提供的默认的方法是add方法
                                                                  acc_num.add(x)
                                                              # 执行foreach
                                                              l1_textFile.foreach(add_num)
                                                              # 输出累加器的值
                                                              print(acc_num)  # 25
                                                              print(acc_num.value)  # 获取累加器的值,25
                                                          
                                                          25
                                                          25
                                                          

                                                          (11)广播变量broadcast
                                                          • collectAsMap():转化为dict类型
                                                          • broadcast:键值对形式
                                                            _14_testBroadcast.py
                                                            # -*- coding: utf-8 -*-
                                                            # Program function:
                                                            '''
                                                            广播变量的作用是提高Spark程序的性能。当需要在多个task上共享某个变量时,如果每个task都直接复制这个变量,那么会很浪费资源。因此,Spark提供了广播变量的机制,可以让Driver将变量广播给所有的Executor,从而使得每个Executor只需要拷贝一份变量,避免了大量的数据复制操作,提高了程序的执行效率。
                                                            '''
                                                            from pyspark import SparkContext, SparkConf
                                                            if __name__ == '__main__':
                                                                conf = SparkConf().setAppName("miniPy").setMaster("local[*]")
                                                                sc = SparkContext(conf=conf)
                                                                # 这里定义rdd
                                                                kvFruit = sc.parallelize([(1, "apple"), (2, "orange"), (3, "banana"), (4, "grape")])
                                                                print(kvFruit.collect())
                                                                fruit_collect_as_map = kvFruit.collectAsMap()
                                                                print(type(fruit_collect_as_map))
                                                                # 声明广播变量
                                                                broadcast_value = sc.broadcast(fruit_collect_as_map)
                                                                # print(fruit_collect_as_map)#{1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'}
                                                                # 通过指定索引来查询对应水果名称
                                                                friut_ids = sc.parallelize([2, 1, 4, 3])
                                                                # 执行查询
                                                                print("执行查询后的结果")
                                                                # ['orange', 'apple', 'grape', 'banana']
                                                                print(friut_ids.map(lambda fruit: fruit_collect_as_map[fruit]).collect())
                                                                print("执行广播变量之后查询后的结果")
                                                                # 使用广播变量的取值
                                                                print(friut_ids.map(lambda fruit: broadcast_value.value[fruit]).collect())
                                                            
                                                            [(1, 'apple'), (2, 'orange'), (3, 'banana'), (4, 'grape')]
                                                            
                                                            执行查询后的结果
                                                            ['orange', 'apple', 'grape', 'banana']
                                                            执行广播变量之后查询后的结果
                                                            ['orange', 'apple', 'grape', 'banana']
                                                            

                                                            (12) 搜狗案例

                                                            思路:

                                                            • 1-os.version可以克服多版本问题
                                                            • 2-读取数据:过滤单个词长度不为0,过滤整条数据块有6个
                                                            • 3-map映射函数:每个数据字段进行逐一正则表达式:split是切割,sub是替换
                                                            • 4-rdd注意统计count
                                                            • 5- 使用 flatMap 和 lambda 函数进行分词和停用词过滤
                                                            • 6-统计思路:map映射统一格式,reduceByKey将相同的key的value数据累加操作,sortBy进行自定义排序
                                                              _02readSougouFile.py
                                                              # -*- coding: utf-8 -*-
                                                              # Program function:
                                                              from pyspark import SparkContext, SparkConf
                                                              import os
                                                              import re
                                                              import jieba
                                                              os.environ['SPARK_HOME'] = '/export/server/spark'
                                                              PYSPARK_PYTHON = "/root/anaconda3/envs/pyspark_env/bin/python3"
                                                              # 当存在多个版本时,不指定很可能会导致出错
                                                              os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
                                                              os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
                                                              if __name__ == '__main__':
                                                                  print('PySpark RDD Program')
                                                                  # TODO:1、创建应用程序入口SparkContext实例对象
                                                                  conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
                                                                  sc = SparkContext.getOrCreate(conf)
                                                                  # TODO: 2、从文件系统加载数据,创建RDD数据集
                                                                  # TODO: 3、调用集合RDD中函数处理分析数据
                                                                  print("=====================1-读取数据==========================")
                                                                  resultRDD2 = sc.textFile("file:///export/data/spark_practice/PySpark-SparkCore_3.1.2/data/sougou/SogouQ.reduced") \
                                                                      .filter(lambda line: (len(line.strip()) > 0) and len(re.split("\\s+", line)) == 6) \
                                                                      .map(lambda line: (
                                                                      re.split("\\s+", line)[0],
                                                                      re.split("\\s+", line)[1],
                                                                      re.sub("\\[|\\]", "", re.split("\\s+", line)[2]),
                                                                      re.split("\\s+", line)[3],
                                                                      re.split("\\s+", line)[4],
                                                                      re.split("\\s+", line)[5]))
                                                                  print("count={},first={}".format(resultRDD2.count(), resultRDD2.first()))
                                                                  print(type(resultRDD2.collect()))
                                                                  print("=====================2-搜索关键词统计==========================")
                                                                  # TODO: 4、获取用户【查询词】,使用JieBa进行分词,按照单词分组聚合统计出现次数,类似WordCount程序
                                                                  # reduceByKey将相同的key的value数据累加操作
                                                                  # 定义正则表达式模式,匹配中文字符
                                                                  pattern = "[\u4e00-\u9fa5]+"
                                                                  regex = re.compile(pattern)
                                                                  # 定义停用词表路径
                                                                  stopwords_path = '/export/data/spark_practice/PySpark-SparkCore_3.1.2/data/stop_table/哈工大停用词表.txt'
                                                                  # 加载停用词表
                                                                  with open(stopwords_path, 'r', encoding='utf-8') as file:
                                                                      stopwords = [line.strip() for line in file]
                                                                  # 使用 flatMap 和 lambda 函数进行分词和停用词过滤
                                                                  flat_map = resultRDD2.flatMap(
                                                                      lambda record: [word for word in jieba.cut(record[2])    # jieba分词
                                                                                      if regex.match(word)                      # 匹配中文
                                                                                      and word not in stopwords                 # 去除停用词
                                                                                      and len(word) > 1]                         # 去除单个词
                                                                  )
                                                                  key1 = flat_map.map(lambda word: (word, 1)) \
                                                                      .reduceByKey(lambda k1, k2: k1 + k2) \
                                                                      .sortBy(lambda x: x[1], ascending=False) \
                                                                      .take(10)
                                                                  print(key1)
                                                                  # key2 = flat_map.map(lambda word: (word, 1)) \
                                                                  #     .reduceByKey(lambda k1, k2: k1 + k2) \
                                                                  #     .map(lambda x,y:(y,x))
                                                                  #     #.sortByKey(True, 3, keyfunc=lambda k: k.lower())\
                                                                  # print(key2.collect())
                                                                  # TODO: 5、用户搜索点击统计
                                                                  print("=====================2-用户搜索点击统计==========================")
                                                                  # 统计出每个用户每个搜索词点击网页的次数,可以作为搜索引擎搜索效果评价指标。先按照用户ID分组,再按照【查询词】分组,最后统计次数,求取最大次数、最小次数及平均次数
                                                                  flat_map = resultRDD2.map(lambda record: (record[1], record[2]))
                                                                  # print(flat_map.take(1)) 测试打印
                                                                  key3 = flat_map.map(lambda word: (word, 1)) \
                                                                      .reduceByKey(lambda k1, k2: k1 + k2) \
                                                                      .sortBy(lambda x: x[1], ascending=False) \
                                                                      .take(10)
                                                                  print(key3)
                                                                  print("=====================3-搜索时间段统计==========================")
                                                                  # 按照【访问时间】字段获取【小时】,分组统计各个小时段用户查询搜索的数量,进一步观察用户喜欢在哪些时间段上网,使用搜狗引擎搜索
                                                                  hourSearchRDD = resultRDD2.map(lambda record: str(record[0])[0:2])
                                                                  print(hourSearchRDD.take(5))
                                                                  key4 = hourSearchRDD.map(lambda word: (word, 1)) \
                                                                      .reduceByKey(lambda k1, k2: k1 + k2) \
                                                                      .sortBy(lambda x: x[1], ascending=False) \
                                                                      .take(10)
                                                                  print(key4)
                                                                  sc.stop()
                                                              
                                                              =====================1-读取数据==========================
                                                              count=1721708,first=('00:00:00', '2982199073774412', '360安全卫士', '8', '3', 'download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html')
                                                              
                                                              =====================2-搜索关键词统计==========================
                                                              [('地震', 89367), ('救灾物资', 69092), ('哄抢', 69084), ('汶川', 68409), ('原因', 62688), ('下载', 39462), ('图片', 31979), ('视频', 29157), ('暗娼', 26825), ('朗斯', 20049)]
                                                              =====================2-用户搜索点击统计==========================
                                                              [(('7822241147182134', 'free+girls'), 274), (('9165829432475153', '人妖摄影'), 177), (('49180486532951556', 'babes'), 160), (('519493440787543', '镣铐绳艺视频'), 156), (('19447614244798927', '绳艺kb视频'), 141), (('7076435807359547', '库娃+三围'), 135), (('5515612701706876', '玄幻小说'), 134), (('900755558064074', 'liuhecai'), 123), (('1756178764125793', '屋面种植土'), 118), (('5634142212517204', '亭子施工图'), 116)]
                                                              =====================3-搜索时间段统计==========================
                                                              ['00', '00', '00', '00', '00']
                                                              [('16', 116540), ('21', 115126), ('20', 110863), ('15', 109086), ('10', 104694), ('17', 104639), ('14', 101295), ('22', 99977), ('11', 97991), ('19', 97129)]
                                                                
                                                              

                                                              (13)网站点击案例

                                                              思路:

                                                              • 计算pv:计算多少行,map格式化,reduceByKey聚合
                                                              • 计算uv:筛选ip,去重后计数
                                                              • 计算topK:下标记得提前筛选长度,需要使用"-"
                                                                # -*- coding: utf-8 -*-
                                                                # Program function:完成网站访问指标的统计,Pv,Uv,TopK
                                                                '''
                                                                * 1-准备SparkContext的环境
                                                                * 2-读取网站日志数据,通过空格分隔符进行分割
                                                                * 3-计算Pv,统计有多少行,一行就算做1次Pv
                                                                * 4-计算Uv,筛选出ip,统计去重后Ip
                                                                * 5-计算topk,筛选出对应业务的topk
                                                                '''
                                                                from pyspark import SparkConf, SparkContext
                                                                if __name__ == '__main__':
                                                                    # *1 - 准备SparkContext的环境
                                                                    conf = SparkConf().setAppName("click").setMaster("local[*]")
                                                                    sc = SparkContext.getOrCreate(conf=conf)
                                                                    sc.setLogLevel("WARN")  # 直接将log4j放入文件夹中
                                                                    # *2 - 读取网站日志数据,通过空格分隔符进行分割
                                                                    file_rdd = sc.textFile("/export/data/spark_practice/PySpark-SparkCore_3.1.2/data/click/access.log")
                                                                    # *3 - 计算Pv,统计有多少行,一行就算做1次Pv
                                                                    rdd_map_rdd = file_rdd.map(lambda line: ("pv", 1))
                                                                    print("pv result is:", rdd_map_rdd.reduceByKey(lambda x, y: x + y).collect())  # pv result is: [('pv', 14619)]
                                                                    # *4 - 计算Uv,筛选出ip,统计去重后Ip
                                                                    # file_rdd_map = file_rdd.map(lambda line: line.split(" ")[0])
                                                                    file_rdd_map = file_rdd \
                                                                        .map(lambda line: line.split(" ")) \
                                                                        .map(lambda x: x[0])
                                                                    # print(file_rdd_map.take(5))
                                                                    uv_num = file_rdd_map \
                                                                        .distinct() \
                                                                        .map(lambda line: ("uv", 1))
                                                                    print("uvCount:", uv_num.reduceByKey(lambda x, y: x + y).collect())  # uvCount: [('uv', 1051)]
                                                                    # *5 - 计算topk,筛选出对应业务的topk,访问网站【10下标】的Topk,"需要使用\"-\"
                                                                    re = file_rdd \
                                                                        .map(lambda x: x.split(" ")) \
                                                                        .filter(lambda line: len(line) > 10) \
                                                                        .map(lambda line: (line[10], 1)) \
                                                                        .reduceByKey(lambda x, y: x + y) \
                                                                        .sortBy(lambda x: x[1], False) \
                                                                        .filter(lambda x: x[0] != "\"-\"")
                                                                    print(re.take(10))
                                                                    re1 = file_rdd \
                                                                        .map(lambda x: x.split(" ")) \
                                                                        .filter(lambda line: len(line) > 10) \
                                                                        .map(lambda line: (line[10], 1)) \
                                                                        .groupByKey() \
                                                                        .mapValues(sum) \
                                                                        .sortBy(lambda x: x[1], False) \
                                                                        .filter(lambda x: x[0] != "\"-\"")
                                                                    print(re1.take(10))
                                                                
                                                                pv result is: [('pv', 14619)]
                                                                uvCount: [('uv', 1051)]
                                                                [('"http://blog.fens.me/category/hadoop-action/"', 547), ('"http://blog.fens.me/"', 377), ('"http://blog.fens.me/wp-admin/post.php?post=2445&action=edit&message=10"', 360), ('"http://blog.fens.me/r-json-rjson/"', 274), ('"http://blog.fens.me/angularjs-webstorm-ide/"', 271), ('"http://blog.fens.me/wp-content/themes/silesia/style.css"', 228), ('"http://blog.fens.me/nodejs-express3/"', 198), ('"http://blog.fens.me/hadoop-mahout-roadmap/"', 182), ('"http://blog.fens.me/vps-ip-dns/"', 176), ('"http://blog.fens.me/nodejs-underscore/"', 165)]
                                                                [('"http://blog.fens.me/category/hadoop-action/"', 547), ('"http://blog.fens.me/"', 377), ('"http://blog.fens.me/wp-admin/post.php?post=2445&action=edit&message=10"', 360), ('"http://blog.fens.me/r-json-rjson/"', 274), ('"http://blog.fens.me/angularjs-webstorm-ide/"', 271), ('"http://blog.fens.me/wp-content/themes/silesia/style.css"', 228), ('"http://blog.fens.me/nodejs-express3/"', 198), ('"http://blog.fens.me/hadoop-mahout-roadmap/"', 182), ('"http://blog.fens.me/vps-ip-dns/"', 176), ('"http://blog.fens.me/nodejs-underscore/"', 165)]
                                                                  
                                                                

                                                                (14)网站点击案例

                                                                思路:

                                                                • 1-读取两份文件
                                                                • 2-city_ip_rdd作为广播变量
                                                                • 3-用户ip转化为long类型
                                                                • 4-二分查找方法在city_ip_rdd查找对应的经度和维度,让其匹配index,然后定位经纬度
                                                                • 5-mapPartitions(函数):函数应用于分区为单位
                                                                • 6-reduceByKey(lambda x,y:x+y)
                                                                  ipchecklocation.py
                                                                  # -*- coding: utf-8 -*-
                                                                  # Program function:实现用户ip的地址查询,实现相同经纬度范围统计
                                                                  '''
                                                                  * 1-准备Spark的上下文环境
                                                                  * 2-读取用户所在ip信息的文件,切分后选择下标为1字段就是用户ip
                                                                  * 3-读取城市ip段信息,需要获取起始ip的long类型(下标2),结束ip的long类型(下标3),经度(下标13),维度(下标14)
                                                                  * 4-通过ip地址转化为long类型IP
                                                                  * 5-采用折半查找方法寻找ip对应的经纬度
                                                                  * 6-根据相同经纬度的数据进行累加统计在进行排序
                                                                  * 7-画个图
                                                                  '''
                                                                  from pyspark import SparkConf, SparkContext
                                                                  from pyspark.sql import SparkSession
                                                                  # 需要拿到用户ip转化为long类型,然后通过二分查找方法在city_ip_rdd查找对应的经度和维度
                                                                  def ip_transform(ip):
                                                                      ips = ip.split(".")  # [223,243,0,0] 32位二进制数
                                                                      ip_num = 0
                                                                      for i in ips:
                                                                          ip_num = int(i) | ip_num << 8
                                                                      return ip_num
                                                                  def binary_search(ip_num, city_rdd_broadcast_value):
                                                                      start = 0
                                                                      end = len(city_rdd_broadcast_value) - 1
                                                                      # (1)起始ip的long(2)结束ip的long(3)经度(4)维度的信息
                                                                      # city_rdd_broadcast_value
                                                                      while (start <= end):
                                                                          mid = int((start + end) / 2)
                                                                          # 首先判断是否位于middle的[0]和[1]下标之间
                                                                          if (ip_num >= int(city_rdd_broadcast_value[mid][0]) and ip_num <= int(city_rdd_broadcast_value[mid][1])):
                                                                              # 指导找到中间位置,返回mid位置=后面的index
                                                                              return mid
                                                                          # 如果是小于middle[0]起始ip, end=mid
                                                                          if (ip_num < int(city_rdd_broadcast_value[mid][0])):
                                                                              end = mid
                                                                          # 如果是大于middle[1]结束ip, start=mid
                                                                          if (ip_num > int(city_rdd_broadcast_value[mid][1])):
                                                                              start = mid
                                                                  def main():
                                                                      global city_ip_rdd_broadcast
                                                                      # *1 - 准备Spark的上下文环境
                                                                      spark = SparkSession.builder.appName("ipCheck").master("local[*]").getOrCreate()
                                                                      sc = spark.sparkContext
                                                                      sc.setLogLevel("WARN")  # 直接将log4j放入文件夹中
                                                                      # *2 - 读取用户所在ip信息的文件,切分后选择下标为1字段就是用户ip
                                                                      # user_rdd读取的是包含有user的ip地址的信息
                                                                      user_rdd = sc.textFile(
                                                                          "/export/data/spark_practice/PySpark-SparkCore_3.1.2/data/ip/20190121000132394251.http.format")
                                                                      dest_ip_rdd = user_rdd \
                                                                          .map(lambda x: x.split("|")) \
                                                                          .map(lambda x: x[1])
                                                                      # print(dest_ip_rdd.take(4))#['125.213.100.123', '117.101.215.133', '117.101.222.68', '115.120.36.118']
                                                                      # *3 - 读取城市ip段信息,需要获取起始ip的long类型(下标2),结束ip的long类型(下标3),经度(下标13),维度(下标14)
                                                                      # .map(lambda x:(x[2],x[3],x[len(x)-2],x[len(x)-1]))
                                                                      ip_rdd = sc.textFile("/export/data/spark_practice/PySpark-SparkCore_3.1.2/data/ip/ip.txt")
                                                                      city_ip_rdd = ip_rdd \
                                                                          .map(lambda x: x.split("|")) \
                                                                          .map(lambda x: (x[2], x[3], x[13], x[14]))
                                                                      print(type(city_ip_rdd))
                                                                      # Broadcast a read-only variable to the cluster,下面的代码中使用city.collect将rdd转化为list在进行广播
                                                                      city_ip_rdd_broadcast = sc.broadcast(city_ip_rdd.collect())
                                                                      print(type(city_ip_rdd_broadcast))
                                                                      # city_ip_rdd是包含有城市的(1)起始ip的long(2)结束ip的long(3)经度(4)维度的信息
                                                                      # print(city_ip_rdd.take(5))
                                                                      def GetPos(x):  # x=【192.168.88.161,192.168.88.161】
                                                                          city_rdd_broadcast_value = city_ip_rdd_broadcast.value
                                                                          def getResult(ip):
                                                                              # *4 - 通过ip地址转化为long类型IP
                                                                              ip_num = ip_transform(ip)
                                                                              # *5 - 采用折半查找方法寻找ip对应的经纬度
                                                                              # index 获取 (1)起始ip的long(2)结束ip的long(3)经度(4)维度的信息
                                                                              index = binary_search(ip_num, city_rdd_broadcast_value)
                                                                              return ((city_rdd_broadcast_value[index][2], city_rdd_broadcast_value[index][3]), 1)
                                                                          # 得到的是((经度,维度),1),下面是python的map函数
                                                                          re = map(tuple, [getResult(ip) for ip in x])  # 【ip=192.168.88.161,ip=192.168.88.161】
                                                                          return re
                                                                      # *6 - 根据相同经纬度的数据进行累加统计在进行排序  【192.168.88.161,192.168.88.161】
                                                                      ip_rdd_map_partitions = dest_ip_rdd.mapPartitions(GetPos)
                                                                      result = ip_rdd_map_partitions.reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], False)
                                                                      print("final sorted result is:")
                                                                      print(result.take(5))
                                                                      # [(('108.948024', '34.263161'), 1824), (('116.405285', '39.904989'), 1535), (('106.504962', '29.533155'), 400), (('114.502461', '38.045474'), 383), (('106.57434', '29.60658'), 177)]
                                                                      # *7 - 画个图
                                                                      sc.stop()
                                                                  if __name__ == '__main__':
                                                                      main()
                                                                  
                                                                  
                                                                  
                                                                  final sorted result is:
                                                                  [(('108.948024', '34.263161'), 1824), (('116.405285', '39.904989'), 1535), (('106.504962', '29.533155'), 400), (('114.502461', '38.045474'), 383), (('106.57434', '29.60658'), 177)]
                                                                  

                                                                   
打赏
版权声明:本文采用知识共享 署名4.0国际许可协议 [BY-NC-SA] 进行授权
文章名称:《Spark重温笔记(二):快如闪电的大数据计算框架——你真的了解SparkCore的 RDD 吗?(包含企业级搜狗案例和网站点击案例)》
文章链接:https://goodmancom.com/wl/175915.html