一、 需求叙述:

mapreduce笔试题: 找出有共同好友的users

usr:friend,friend,friend...
---------------
A:B,C,D,F,E,O

B:A,C,E,K

C:F,A,D,I

D:A,E,F,L

E:B,C,D,M,L

F:A,B,C,D,E,O,M

G:A,C,D,E,F

H:A,C,D,E,O

I:A,O

J:B,O

K:A,C,D

L:D,E,F

M:E,F,G

O:A,H,I,J

最后結果:

A,B C,E

A,C D,F

A,D F,E

A,F B,C,D,E,O

B,E C

C,F A,D

D,E L

D,F A,E

D,L E,F

E,L D

F,M E

H,O A

I,O A

 

 

package com.friends.zb;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 找朋友
*
* @author zhangbing
*
*/
public class Friends {

public static class M1 extends Mapper<LongWritable , Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split(":");
String[] split2 = split[1].split(",");
for(String s:split2){
context.write(new Text(s), new Text(split[0]));
}
}
}
public static class R1 extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {

List<String> list = new ArrayList<>();

for (Text t : values) {
list.add(t.toString());
}
Text k = new Text();
for (String s1 : list) {
for (String s2 : list) {
if(s1.compareTo(s2)<0){
k.set(s1 "," s2);
context.write(k, key);
}
}
String string = s1 "," key.toString();
if(s1.compareTo(key.toString())>0){
string=key.toString() "," s1;
}
context.write(new Text(string), new Text("1"));
}
}
}

public static class M2 extends Mapper<LongWritable , Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
context.write(new Text(split[0]), new Text(split[1]));
}
}
public static class R2 extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
int count = 0;
StringBuffer sb = new StringBuffer();
for (Text text : values) {

String s = text.toString();
if("1".equals(s)){
count ;
}else{
sb.append(",").append(s);
}
}
if(count == 2 && sb.length()>0){
context.write(key,new Text(sb.toString().substring(1)));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(Friends.class);

job.setMapperClass(M1.class);

job.setReducerClass(R1.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fileSystem = FilterFileSystem.get(conf);
if(fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileOutputFormat.setOutputPath(job, path);

boolean b = job.waitForCompletion(true);
if(b){
Job job2 = Job.getInstance(conf);

job2.setJarByClass(Friends.class);

job2.setMapperClass(M2.class);

job2.setReducerClass(R2.class);

job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job2,new Path(args[1]));
Path path2 = new Path(args[2]);
FileSystem fileSystem2 = FilterFileSystem.get(conf);
if(fileSystem2.exists(path2)){
fileSystem2.delete(path2,true);
}
FileOutputFormat.setOutputPath(job2, path2);

job2.waitForCompletion(true);
}
}
}