当前位置 博文首页 > 文章内容

    Hadoop基础之MapReduce词频分析

    作者: 栏目:未分类 时间:2020-11-13 15:01:27

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



    Hadoop基础之MapReduce词频分析

    Hadoop基础之MapReduce词频分析

    需求

    • 把需要的数据分离出来
    • 确定key和value
    • 确定泛型

    代码

    App类

    package mapReduce.phoneTraffic.testOne.app;
    
    import mapReduce.common.CountPartitioner;
    import mapReduce.common.JobUtilThree;
    import mapReduce.phoneTraffic.testOne.vo.TestTwoVO;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.io.IOException;
    
    public class TestTwoApp{
        public static class mapper extends Mapper<LongWritable, Text,Text, TestTwoVO>{
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                Text oldKey = new Text();
                TestTwoVO oldValue = new TestTwoVO();
                //获取数据
                String line  = value.toString();
                System.out.println(line);
                String[] files = line.split("\t");
                String ip = files[3];
                int upFlow = Integer.parseInt(files[8]);
                int downFlow = Integer.parseInt(files[9]);
                //将数据封装到value中
                oldValue.setIp(ip);
                oldValue.setUpFlow(upFlow);
                oldValue.setDownFlow(downFlow);
                //写入数据
                context.write(oldKey,oldValue);
    
            }
        }
        public static class reducer extends Reducer<Text,TestTwoVO,Text, TestTwoVO>{
            @Override
            protected void reduce(Text key, Iterable<TestTwoVO> values, Context context) throws IOException, InterruptedException {
    
                TestTwoVO oldValue = new TestTwoVO();
                //创建初始变量
                String ip = "";
                int sum = 0;
                int upFlow = 0;
                int downFlow = 0;
                //迭代values
                for (TestTwoVO value : values) {
                    ip = value.getIp();
                    upFlow += value.getUpFlow();
                    downFlow += value.getDownFlow();
                }
                //整合结果
                sum = upFlow+downFlow;
                oldValue.setIp(ip);
                oldValue.setUpFlow(upFlow);
                oldValue.setDownFlow(downFlow);
                oldValue.setSum(sum);
    
    
                //写入结果
                context.write(key,oldValue);
    
    
            }
        }
    
        public static void main(String[] args) {
            String[] arg = new String[2];
            arg[0] = "E:/input/phoneTraffic.txt";
            arg[1] = "E:/output/testTwo";
            JobUtilThree.submit(TestTwoApp.class,arg,5, CountPartitioner.class);
        }
    }

     

    JobUtil类

    package mapReduce.common;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    import java.lang.reflect.ParameterizedType;
    import java.lang.reflect.Type;
    
    public class JobUtilThree {
        public static void submit(Class driverClass,String[] path,Object...objects){
            try {
                Job job = Job.getInstance();
    
                Class[] innerClass = driverClass.getClasses();
                //获取所有内部类
                for (Class inner : innerClass) {
                    //获取内部类类型
                    ParameterizedType parameterizedType = (ParameterizedType) inner.getGenericSuperclass();
                    Type[] types = parameterizedType.getActualTypeArguments();
                    //判断类型
                    if (Mapper.class.isAssignableFrom(inner.getSuperclass())) {
                        job.setMapperClass(inner);
                        job.setMapOutputKeyClass(Class.forName(types[2].getTypeName()));
                        job.setMapOutputValueClass(Class.forName(types[3].getTypeName()));
                    }else if (Reducer.class.isAssignableFrom(inner.getSuperclass())){
                        job.setReducerClass(inner);
                        job.setOutputKeyClass(Class.forName(types[2].getTypeName()));
                        job.setOutputValueClass(Class.forName(types[3].getTypeName()));
                    }
                }
    
                job.setJarByClass(driverClass);
                //检查输出路径
                FileSystem fileSystem = FileSystem.get(new Configuration());
                if (fileSystem.exists(new Path(path[1]))) {
                    fileSystem.delete(new Path(path[1]),true);
                }
                //设置输入输出路径
                FileInputFormat.setInputPaths(job,new Path(path[0]));
                FileOutputFormat.setOutputPath(job,new Path(path[1]));
                for (Object object : objects) {
                    if (object instanceof Integer) {
                        //设置分区
                        job.setNumReduceTasks((Integer)object);
                    }else if (Partitioner.class.isAssignableFrom((Class)object)){
                        //修改分区默认分配规则
                        job.setPartitionerClass((Class<? extends Partitioner>) object);
                    }
    
                }
                job.waitForCompletion(true);
    
            } catch (IOException | ClassNotFoundException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }

     

    重写的Partitioner类

    package mapReduce.common;
    
    import mapReduce.phoneTraffic.testOne.vo.TestTwoVO;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class CountPartitioner extends Partitioner<Text,TestTwoVO>{
        @Override
        public int getPartition(Text text, TestTwoVO testTwoVO, int numPartitions) {
            String photoNumer = text.toString();
            int numReduceTask = 0;
    
    
            if (photoNumer.length() == 11){
                if(photoNumer.startsWith("1")){
                    numReduceTask = 1;
                }else{
                    numReduceTask = 0;
                }
            }else if (photoNumer.length() >=6 && photoNumer.length()<=8){
                if(photoNumer.startsWith("0")){
                    numReduceTask = 2;
                }else{
                    numReduceTask = 3;
                }
            }else{
                numReduceTask = 4;
            }
            //如果所有条件不满足则证明数据出错,写入0号区
            return numReduceTask;
        }
    }

     

    VO类

    package mapReduce.phoneTraffic.testOne.vo;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class TestTwoVO implements Writable {
    
        private String ip;
        private int upFlow;
        private int downFlow;
        private int sum;
    
        public String getIp() {
            return ip;
        }
    
        public void setIp(String ip) {
            this.ip = ip;
        }
    
        public int getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(int upFlow) {
            this.upFlow = upFlow;
        }
    
        public int getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(int downFlow) {
            this.downFlow = downFlow;
        }
    
        public int getSum() {
            return sum;
        }
    
        public void setSum(int sum) {
            this.sum = sum;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(ip);
            out.writeInt(upFlow);
            out.writeInt(downFlow);
            out.writeInt(sum);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.ip = in.readUTF();
            this.upFlow = in.readInt();
            this.downFlow = in.readInt();
            this.sum = in.readInt();
        }
    
        @Override
        public String toString() {
            return "TestTwoVO{" +
                    "ip地址='" + ip + '\'' +
                    ", 下行流量=" + upFlow +
                    ", 上行流量=" + downFlow +
                    ", 总流量=" + sum +
                    '}';
        }
    }