共计 9080 个字符,预计需要花费 23 分钟才能阅读完成。
这篇文章将为大家详细讲解有关 hadoop 中如何利用 mapreduce 实现 wordcount 和电影评分预测,文章内容质量较高,因此丸趣 TV 小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
mapreduce 中 map 指映射,map 指的是归约。
mapreduce 是一个 key-value 来处理数据的编程模型,它使用 map 将一组 key-value 映射为另一组 key-value
通过底层传递给 reduce,在 reduce 中,它将所有 map 过程传递过来的 key-value 进行归约,相同的 key 值,value 值会放在一起。mapreduce 内部还会对 reduce 过程中的 key 值进行一次排序。
一.WordCount
public class WordCount
//
public static final String HDFS = hdfs://localhost:8888
public static final Pattern DELIMITER = Pattern.compile(\\b([a-zA-Z]+)\\b
// 自定义 Map 类型执行 映射 这一部分
public static class Map extends Mapper LongWritable, Text, Text, IntWritable
{
//mapreduce 中,Text 相当于 String 类型,IntWritable 相当于 Int 类型
//LongWritable 是实现了 WritableComparable 的一个数据类型。 private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
// 重写父类 map() 函数
public void map(LongWritable key, Text value,
Context context)
throws IOException, InterruptedException
{
// 读取一行数据
String line = value.toString();
// 将该行字符全部变为小写
line = line.toLowerCase();
// 根据定义好的正则表达式拆分一行字符串。 Matcher matcher = DELIMITER.matcher(line);
while(matcher.find()){
// 将分解的一个个单词类型转化为 Text。 word.set(matcher.group());
// 将相应的 key-value 值传入。key 值为单词,value 值为 1.
context.write(word,one);
}
}
}
// 自定义 Combine 过程先对本地进行的 map 进行一次 reduce 过程,减少传递给主机的数据量.
public static class Combine extends Reducer Text, IntWritable, Text, IntWritable
{
@Override
public void reduce(Text key, Iterable IntWritable values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 遍历同一个 key 值的所有 value, 所有的 value 放在同一个 Iterable 中。 for (IntWritable line : values)
{ sum += line.get();
}
IntWritable value = new IntWritable(sum);
// 将 key-value 按照指定的输出格式输出。 context.write(key, value);
}
}
public static class Reduce extends Reducer Text, IntWritable, Text, IntWritable
{
@Override
public void reduce(Text key, Iterable IntWritable values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable line : values)
{ sum += line.get();
}
IntWritable value = new IntWritable(sum);
context.write(key, value);
}
}
public static void main(String[] args) throws Exception
{ JobConf conf = WordCount.config();
String input = data/1.txt
String output = HDFS + /user/hdfs/wordcount
// 自定义 HDFS 文件操作工具类
HdfsDAO hdfs = new HdfsDAO(WordCount.HDFS, conf);
// 移除存在的文件否则会报文件生成文件已存在的错误
hdfs.rmr(output);
Job job = new Job(conf);
job.setJarByClass(WordCount.class);
// 设置输出的 key 值类型
job.setOutputKeyClass(Text.class);
// 设置输出的 value 值类型
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WordCount.Map.class);
job.setCombinerClass(WordCount.Combine.class);
job.setReducerClass(WordCount.Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
// 设置输出的格式,这里使用的是自定义的 FileOutputFormat 类,见下文。 job.setOutputFormatClass(ParseTextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static JobConf config() { JobConf conf = new JobConf(WordCount.class);
conf.setJobName( WordCount
conf.addResource( classpath:/hadoop/core-site.xml
conf.addResource( classpath:/hadoop/hdfs-site.xml
conf.addResource( classpath:/hadoop/mapred-site.xml
// conf.set( io.sort.mb ,
return conf;
}
}
自定义文件输出格式
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
public class ParseTextOutputFormat K, V extends FileOutputFormat K, V {
protected static class LineRecordWriter K, V extends RecordWriter K, V {
private static final String utf8 = UTF-8
private static final byte[] newline;
static {
try { newline = \n .getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException( can t find + utf8 + encoding
}
}
protected DataOutputStream out;
private final byte[] keyValueSeparator;
public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException( can t find + utf8 + encoding
}
}
public LineRecordWriter(DataOutputStream out) {
this(out, \t
}
/**
* Write the object to the byte stream, handling Text as a special
* case.
* @param o the object to print
* @throws IOException if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else { out.write(o.toString().getBytes(utf8));
}
}
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey nullValue) {
return;
}
if (!nullKey) { writeObject(key);
}
if (!(nullKey || nullValue)) { out.write(keyValueSeparator);
}
if (!nullValue) { writeObject(value);
}
out.write(newline);
}
public synchronized
void close(TaskAttemptContext context) throws IOException { out.close();
}
}
public RecordWriter K, V
getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException { Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get( mapred.textoutputformat.separator ,
:
CompressionCodec codec = null;
String extension =
if (isCompressed) {
Class ? extends CompressionCodec codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter K, V (fileOut, keyValueSeparator);
} else { FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter K, V (new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}
}
二. 电影评分预测
整个算法的实现中使用了 slop one 算法来预测评分,此处自定义的输出类与上文一致。
输入文件格式为 userId::movieId::score
package main.java.org.conan.myhadoop.recommend;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.mapred.JobConf;
import main.java.org.conan.myhadoop.hdfs.HdfsDAO;
public class Recommend {
public static final String HDFS = hdfs://localhost:8888
public static final Pattern DELIMITER = Pattern.compile([\t,]
public static final Pattern STRING = Pattern.compile([\t,:]
// public final static int movieListLength = 100000;
// public static int []movieList = new int[movieListLength];
public static List movieList = new ArrayList();
public static Map userScore = new HashMap();
public static void main(String[] args) throws Exception {
Map String, String path = new HashMap String, String
String in = logfile/4.txt
String out = HDFS + /user/hdfs/recommend + /step5
// path.put( data , logfile/small.csv
// path.put( data , logfile/ratings.dat
if(args.length == 2){ in = args[0];
out = HDFS + args[1];
System.out.println(out);
}
// 设置数据输入路径
path.put(data , in);
// 设置第一步输入文件路径
path.put( Step1Input , HDFS + /user/hdfs/recommend
// 设置第一步结果输出路径
path.put(Step1Output , path.get( Step1Input) + /step1
// 设置第二步输入文件路径
path.put(Step2Input , path.get( Step1Output));
// 设置第二步结果输出路径
path.put(Step2Output , path.get( Step1Input) + /step2
// 设置第三步输入文件路径
path.put(Step3Input1 , path.get( data));
// path.put( Step3Input2 , logfile/movie/movies.dat
// 设置第三步结果输出路径
path.put(Step3Output , path.get( Step1Input) + /step3
// path.put(Step3Input2 , path.get( Step2Output));
// path.put(Step3Output2 , path.get( Step1Input) + /step3_2
//
// 设置第四步输入文件路径 1
path.put(Step4Input1 , path.get( Step2Output));
// 设置第四步输入文件路径 2
path.put(Step4Input2 , path.get( Step3Output));
// 设置第四步结果输出路径
path.put(Step4Output , path.get( Step1Input) + /step4
//
// 设置第五步输入文件路径
path.put(Step5Input , path.get( Step4Output));
// path.put(Step5Input2 , path.get( Step3Output2));
// 设置第五步结果输出路径
path.put(Step5Output , out);
// 第一步,根据给出的用户评分文件,求出每个用户对物品的评分矩阵
Step1.run(path);
// 根据第一步的输出结果计算物品评分的同现矩阵
Step2.run(path);
// 获取所有用户评过分的电影,并输出每位用户对每部电影的评分,未评过则记为 0
Step3.run(path);
// 根据第二步与第三步的结果计算出每位用户对每部电影的评分
Step4.run(path);
// 整理输出格式。 Step5.run(path);
System.exit(0);
}
public static JobConf config() { JobConf conf = new JobConf(Recommend.class);
conf.setJobName( Recommand
conf.addResource( classpath:/hadoop/core-site.xml
conf.addResource( classpath:/hadoop/hdfs-site.xml
conf.addResource( classpath:/hadoop/mapred-site.xml
// conf.set( io.sort.mb ,
return conf;
}
}
// 求出用户对物品的评分矩阵,即得出用户对电影 的评分矩阵
// 每一行数据代表一个用户对所有打分电影的结果
//key 值为 userId, value 值为 movieID:score,movieId:score
正文完