当前位置 博文首页 > 文章内容

    deletefile,在SparkContext添加deleteFile方法

    作者:Tan09wlll 栏目:Tan的日记 时间:2021-03-06 16:56:23

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



    修改源码的背景

         在大数据框架Spark的源码中我们使用addFile方法将一些文件分发给各个节点,当我们要访问Spark作业中的文件,将使用SparkFiles.get(fileName)找到它的下载位置,但是Spark只提供给我们addFile方法,却没有提供deleteFile。我们知道addFile是SparkContext类的方法,而SparkContext是Spark功能的主要入口。SparkContext代表了与Spark集群的连接,可用于在该集群上创建RDD、累积器和广播变量。每个JVM只能活动一个SparkContext。

    在SparkContext添加deleteFile方法

         下面是SparkContext中addFile的源码,首先我们是不知道源码中是怎么操作这些文件的,我们将通过阅读addFile的源码来学习怎么去添加deleteFile方法,我们只有知道怎么添加才知道怎么去修改它,所谓触类旁通。

         通过上面的源码我们知道,是使用addedFiles 这个ConcurrentHashMap[用于存储每个静态文件/jar的URL以及文件的本地时间戳的

         private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala

         上面的学习我们已经知道了添加的方法,然后就是添加deleteFilele了,具体的实现如下:

    NettyStreamManager中添加deleteFile方法实现

         我们已经了解了在SparkContext添加deleteFile方法,我们了解下NettyStreamManager。NettyStreamManager是StreamManager实现,用于服务于NettyRpcEnv中的文件。在这个管理器中可以注册三种资源,都是由实际文件支持的。

         - "/files":一个扁平的文件列表;作为SparkContext.addFile的后端。

         - "/jars":一个扁平的文件列表;作为SparkContext.addJar的后端。

         - 任意目录;该目录下的所有文件通过管理器变得可用,尊重目录的层次结构。只支持流媒体(openStream)。

         我们还是先看addFile的源码,先看父类RpcEnvFileServer的接口,RpcEnv用来向应用程序所拥有的其他进程提供文件的服务器。该file Server可以返回由普通库处理的URI(如 "http "或 "hdfs"),也可以返回由RpcEnv#fetchFile处理的 "spark "URI。

         2. NettyStreamManager中addFile方法实现

         3. 跟addFile一样,先去父类RpcEnvFileServer中添加deleteFile方法

         4. 接下来我们将在NettyStreamManager中实现deleteFile方法,如下:

    修改Antlr4的SqlBase.g4文件

         我们知道Spark中的SQL解析是通过ANTLR4来解析成语法树的,如果不清楚这个过程,可以阅读我的这篇博客【Spark SQL解析过程以及Antlr4入门】来了解,所以我们如果要在Spark Sql也支持的话,那么需要修改SqlBase.g4这个文件,添加DElETE

    修改SparkSqlParser解析器

         找到org.apache.spark.sql.execution.SparkSqlParser类,添加对移除文件的支持,SparkSqlParser是Spark SQL语句的具体解析器,修visitManageResource方法

         再找到resources.scala文件

    修改SparkSQLCLIDriver

         找到org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver 这个类,然后修改

    源码编译

         我们将源码放到linux的服务器中去编译,然后部署

         /dev/make-distribution.sh --name2.6.0-cdh5.14.2 --tgz-Pyarn-Phadoop-2.6 -Phive-Phive-thriftserver-Dhadoop.version=2.6.0-cdh5.14.2

    验证是否生效