共计 5967 个字符,预计需要花费 15 分钟才能阅读完成。
今天就跟大家聊聊有关如何理解 TopK 算法及其实现,可能很多人都不太了解,为了让大家更加了解,丸趣 TV 小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
1、问题描述
在大数据规模中,经常遇到一类需要求出现频率最高的 K 个数,这类问题称为“TOPK”问题!例如:统计歌曲中最热门的前 10 首歌曲,统计访问流量最高的前 5 个网站等。
2、例如统计访问流量最高的前 5 个网站:
数据 test.data 文件:
数据格式解释:域名 上行流量 下行流量
思路:
1、Mapper 每解析一行内容, 按照 \t 获取各个字段
2、因为 URL 有很多重复记录, 所以将 URL 放到 key(通过分析 MapReduce 原理), 流量放在 value
3、在 reduce 统计总流量, 通过 TreeMap 进行对数据进行缓存, 最后一并输出(值得注意的是要一次性输出必须要用到 Reduce 类的 cleanup 方法)
程序如下:
Mapper 类:
package com.itheima.hadoop.mapreduce.mapper;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Counter;
import com.itheima.hadoop.mapreduce.bean.FlowBean;
public class TopKURLMapper extends Mapper LongWritable, Text, Text, FlowBean {
/**
* @param key
* : 每一行偏移量
* @param value
* : 每一行的内容
* @param context
* : 环境上下文
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/**
* 该计数器是 org.apache.hadoop.mapreduce.Counter
*/
Counter counter = context
.getCounter( ExistProblem , ExistProblemLine // 自定义存在问题的行错误计数器
String line = value.toString(); // 读取一行数据
String[] fields = line.split( \t // 获取各个字段, 按照 \t 划分
try { String url = fields[0]; // 获取 URL 字段
long upFlow = Long.parseLong(fields[1]); // 获取上行流量 (upFlow) 字段
long downFlow = Long.parseLong(fields[2]); // 获取下行流量 (downFlow) 字段
FlowBean bean = new FlowBean(upFlow, downFlow); // 将上行流量和下行流量封装到 bean 中
Text tUrl = new Text(url); // 将 java 数据类型转换 hadoop 数据类型
context.write(tUrl, bean); // 传递的数据较多,封装到 bean 进行传输(tips:bean 传输时需要注意序列化问题)
} catch (Exception e) { e.printStackTrace();
counter.increment(1); // 记录错误行数
}
}
}
Reduce 类:
package com.itheima.hadoop.mapreduce.reducer;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.itheima.hadoop.mapreduce.bean.FlowBean;
public class TopKURLReducer extends Reducer Text, FlowBean, FlowBean, Text {
private TreeMap FlowBean, Text treeMap = new TreeMap FlowBean, Text
/**
* @param key
* : 每一行相同 URL
* @param values
* : 总流量 bean
*/
@Override
public void reduce(Text key, Iterable FlowBean values, Context context)
throws IOException, InterruptedException {
long countUpFlow = 0;
long countDownFlow = 0;
/*
* 1、取出每个 bean 的总流量 2、统计多个 bean 的总流量 3、缓存到 treeMap 中
*/
for (FlowBean bean : values) { countUpFlow += bean.getUpFlow(); // 统计上行流量
countDownFlow += bean.getDownFlow(); // 统计下行总流量
}
// 封装统计的流量
FlowBean bean = new FlowBean(countUpFlow, countDownFlow);
treeMap.put(bean, new Text(key)); // 缓存到 treeMap 中
}
@Override
public void cleanup(Context context) throws IOException,
InterruptedException {
// 遍历缓存
for (Entry FlowBean,Text entry : treeMap.entrySet()) { context.write(entry.getKey(), entry.getValue());
}
super.cleanup(context); // 不能动原本的销毁操作
}
}
FlowBean 类:
package com.itheima.hadoop.mapreduce.bean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable, Comparable FlowBean {
private long upFlow;
private long downFlow;
private long maxFlow;
@Override
public String toString() {
return upFlow + \t + downFlow + \t + maxFlow;
}
/**
* 1、序列化注意的问题, 序列化需要默认的构造方法 (反射) 2、在 readFields() 和 write()方法中, 应该遵循按照顺序写出和读入
*/
public FlowBean() { }
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.maxFlow = upFlow + downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getMaxFlow() {
return maxFlow;
}
public void setMaxFlow(long maxFlow) {
this.maxFlow = maxFlow;
}
@Override
public void readFields(DataInput dataIn) throws IOException { upFlow = dataIn.readLong();
downFlow = dataIn.readLong();
maxFlow = dataIn.readLong();
}
@Override
public void write(DataOutput dataOut) throws IOException { dataOut.writeLong(upFlow);
dataOut.writeLong(downFlow);
dataOut.writeLong(maxFlow);
}
@Override
public int compareTo(FlowBean o) {
return this.maxFlow o.maxFlow ? -1
: this.maxFlow o.maxFlow ? 1 : 0;
}
}
驱动类:
package com.itheima.hadoop.drivers;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import com.itheima.hadoop.mapreduce.bean.FlowBean;
import com.itheima.hadoop.mapreduce.mapper.TopKURLMapper;
import com.itheima.hadoop.mapreduce.reducer.TopKURLReducer;
public class TopKURLDriver extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
/**
* 1、创建 job 作业
* 2、设置 job 提交的 Class
* 3、设置 MapperClass, 设置 ReduceClass
* 4、设置 Mapper 和 Reduce 各自的 OutputKey 和 OutputValue 类型
* 5、设置处理文件的路径, 输出结果的路径
* 6、提交 job
*/
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TopKURLRunner.class);
job.setMapperClass(TopKURLMapper.class);
job.setReducerClass(TopKURLReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(FlowBean.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 参数 true 为打印进度
return job.waitForCompletion(true)?0:1;
}
}
package com.itheima.hadoop.runner;
import org.apache.hadoop.util.ToolRunner;
import com.itheima.hadoop.runner.TopKURLRunner;
public class TopKURLRunner { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new TopKURLRunner(), args);
System.exit(res);
}
}
运行命令:hadoop jar topkurl.jar com.itheima.hadoop.drives.TopKURLDriver /test/inputData /test/outputData
运行结果:
看完上述内容,你们对如何理解 TopK 算法及其实现有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注丸趣 TV 行业资讯频道,感谢大家的支持。