Monday, 27 June 2016

Apache Hadoop : NetFlix Data Analysis MapReduce Case Study



NetFlix Data Analysis example

This is sample hadoop application that counts overall average rating and first two weeks average rating after movie release from netflix's prize sample data. It uses Distributed Cache data join and custom RecordReader and InputFormat.


Python Script to getting data from website:


import sys, json, httplib, urllib, socket


if (len(sys.argv) != 3):
        sys.exit("Usage: %s <input filename> <output filename>" % sys.argv[0])


in_file = open(sys.argv[1], 'r')
out_file = open(sys.argv[2], 'a')

for line in in_file:
        tokens = line.strip().split(",")
        title = tokens[2]

        print "processing %s" % tokens[0]

        connection = httplib.HTTPConnection("www.imdbapi.com")
        query = "/?t=%s" % urllib.quote_plus(title)
        try:
                connection.request("GET", query)
        except socket.gaierror:
                print "Retrying to request..."

        response = connection.getresponse()

        try:
                json_data = json.loads(response.read())

                if 'Released' in json_data:
                        released = json_data['Released']
                else:
                        released = tokens[1]

        except ValueError:
                released = 'N/A'

        out_file.write("%s\n" % ",".join([tokens[0], released, tokens[2]]))


connection.close()


MappClass code:

package com.hadoopgyaan;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.text.ParseException;
import java.util.Date;
import java.util.Calendar;
import java.util.Hashtable;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

class MapClass extends Mapper<Text, Text, Text, Text> {
    static class MovieInfo {
        public String title;
        public String released;

        public MovieInfo(String title, String released) {
            this.title = title;
            this.released = released;
        }
    };

    private static final Log LOG = LogFactory.getLog(MapClass.class);

    private static final DateFormat moviesDf = new SimpleDateFormat("dd MMM yyyy");

    private Hashtable<Integer, MovieInfo> joinData =
        new Hashtable<Integer, MovieInfo>();

    private Text val = new Text();

    public void map(Text key, Text value, Context context)
            throws IOException, InterruptedException {

        String strValue = value.toString();

        if (!strValue.endsWith(":")) {
            String[] fields   = strValue.split(",");
            String rating     = fields[1];
            String ratingDate = fields[2];
            MovieInfo info    = joinData.get(movieIdFromFileName(key));

            if (info != null) {
                val.set(info.released + "," + rating + "," + ratingDate);
                context.write(new Text(info.title), val);
            }
        }
    }

    private Integer movieIdFromFileName(Text fileName) {
        String strKey = fileName.toString();
        return Integer.valueOf(strKey.replaceAll("mv_0+", "").
                replaceAll(".txt", ""));
    }

    @Override
    protected void setup(Context context) {
        Configuration conf = context.getConfiguration();
        Calendar calendar  = Calendar.getInstance();

        try {
            Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
            if (cacheFiles != null && cacheFiles.length > 0) {
                String line;
                String[] tokens;
                BufferedReader joinReader = new BufferedReader(
                        new FileReader(cacheFiles[0].toString()));
                try {
                    while ((line = joinReader.readLine()) != null) {
                        tokens = line.split(",");
                        try {
                            Date releaseDate = moviesDf.parse(tokens[1]);
                            calendar.setTime(releaseDate);
                            if (calendar.get(Calendar.YEAR) >= 2000) {
                                MovieInfo info = new MovieInfo(tokens[2],
                                                Netflix.ratingDateFormat.format(releaseDate));
                                joinData.put(new Integer(tokens[0]), info);
                            }
                        } catch (ParseException e) {}
                    }
                } finally {
                    joinReader.close();
                }
            }
        } catch (IOException e) {
            System.out.println("Exception reading distributed cache: " + e);
        }
    }
}

ReduceClass Code : 

package com.hadoopgyaan;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ReduceClass extends Reducer<Text, Text, Text, Text> {
    private static final Log LOG = LogFactory.getLog(ReduceClass.class);

    public void reduce(Text key, Iterable<Text> values,
                       Context context) throws IOException,
                                               InterruptedException {
        double sumFirstTwoWeeks = 0;
        int countFirstTwoWeeks = 0;
        double sumOverall = 0;
        int countOverall = 0;

        for(Text rating:values) {
            String[] tokens = rating.toString().split(",");
            int ratingValue = Integer.parseInt(tokens[1]);
            sumOverall   += ratingValue;
            countOverall += 1;
            try {
                Date movieReleaseDate = Netflix.ratingDateFormat.parse(tokens[0]);
                Date ratingDate = Netflix.ratingDateFormat.parse(tokens[2]);

                if (isLessThanTwoWeeks(movieReleaseDate, ratingDate)) {
                    sumFirstTwoWeeks   += ratingValue;
                    countFirstTwoWeeks += 1;
                }
            } catch (ParseException e ) {
                LOG.error("Unable to parse rating date");
            }
        }

        double avgOverall       = sumOverall / countOverall;
        double avgFirstTwoWeeks = sumFirstTwoWeeks / countFirstTwoWeeks;

        context.write(key, new Text(avgFirstTwoWeeks + "\t" + avgOverall));
    }

    private boolean isLessThanTwoWeeks(Date d1, Date d2) {
        long milliseconds = Math.abs(d1.getTime() - d2.getTime());
        return TimeUnit.MILLISECONDS.toDays(milliseconds) <= 14;
    }
}

Netflix Driver Code :

package com.hadoopgyaan;
import java.text.DateFormat;
import java.text.SimpleDateFormat;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Netflix extends Configured implements Tool {
    private static final String jobName = "netflix";

    private static final Log LOG = LogFactory.getLog(Netflix.class);

    public static final DateFormat ratingDateFormat =
        new SimpleDateFormat("yyyy-MM-dd");


    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Job job = new Job(conf, jobName);
        job.setJarByClass(Netflix.class);

        if (args.length < 2) {
            usage();
        }

        Path in = new Path(args[0]);
        Path out = new Path(args[1]);

        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);

        job.setInputFormatClass(FileNameTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Netflix(), args);

        System.exit(res);
    }

    private void usage() {
        System.err.println("Usage: hadoop jar netflix.jar Netflix <in directory> <out directory>");
        System.exit(1);
    }

}

FileNameLineRecordReader and FileNameTextInputFormat

There is easier way to get file name from Map part, but I left these two classes as a sample of custom RecordReader and InputFormat. Maybe someone find it useful.

FileNameLineRecordReader Code

package com.hadoopgyaan;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

public class FileNameLineRecordReader extends RecordReader<Text, Text> {
    private static final Log LOG = LogFactory.getLog(FileNameLineRecordReader.class);

    private final LineRecordReader lineRecordReader;

    private Text innerValue;

    private Text key;

    private Text value;

    private Path splitPath;

    public Class getKeyClass() { return Text.class; }

    public FileNameLineRecordReader(Configuration conf)
        throws IOException {

        lineRecordReader = new LineRecordReader();
    }

    public void initialize(InputSplit genericSplit,
        TaskAttemptContext context) throws IOException {
        splitPath = ((FileSplit) genericSplit).getPath();
        lineRecordReader.initialize(genericSplit, context);
    }

    public static void setKeyValue(Text key, Text value,
            Text line, Path path) {
        key.set(path.getName());
        value.set(line);
    }

    /** Read key/value pair in a line. */
    public synchronized boolean nextKeyValue() throws IOException {
        String line;
        if (lineRecordReader.nextKeyValue()) {
            innerValue = lineRecordReader.getCurrentValue();
            line = innerValue.toString();
        } else {
            return false;
        }
        if (line == null)
            return false;
        if (key == null) {
            key = new Text();
        }
        if (value == null) {
            value = new Text();
        }
        setKeyValue(key, value, innerValue, splitPath);
        return true;
    }

    public Text getCurrentKey() {
        return key;
    }

    public Text getCurrentValue() {
        return value;
    }

    public float getProgress() throws IOException {
        return lineRecordReader.getProgress();
    }

    public synchronized void close() throws IOException {
        lineRecordReader.close();
    }
}


FileNameTextInputFormat Code :

package com.hadoopgyaan;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;

public class FileNameTextInputFormat extends FileInputFormat<Text, Text> {

    protected boolean isSplitable(JobContext context, Path file) {
        final CompressionCodec codec =
            new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
        return codec == null;
    }

    public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
        TaskAttemptContext context) throws IOException {

        context.setStatus(genericSplit.toString());
        return new FileNameLineRecordReader(context.getConfiguration());
    }


}

Downloads :


I hope this tutorial will surely help you. If you have any questions or problems please let me know.

Happy Hadooping with Patrick..

Sunday, 26 June 2016

Apache Hadoop : Anagram Finder MapReduce Case Study



Anagram Finder MapReduce example

To identify anagrams words and group them together from Input files which contain n number of varying string.


Problem Statement : Find anagram words from input files , group them together and convert each word into upper case.

AnagramFinderMapper  : This will read input file line by line.It will create a token from line and sort each token to form a key with original value.In this way hadoop will create a group of similar words.


For example : aba and baa will get sorted as aab. So reducer will receive a key as aab with group {aba,baa}


package com.hadoopgyaan;
import java.io.IOException;
import java.util.Arrays;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class AnagramFinderMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,Text> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value,OutputCollector<Text,Text> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer  tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()){
String originalInput = tokenizer.nextToken();
String sortedKey = sortStringChar(originalInput);
System.out.println("Original Input" + originalInput + "Sorted Input" + sortedKey);
output.collect(new Text(sortedKey), new Text(originalInput));
}
}

private String sortStringChar(String string) {
char[] chars = string.toCharArray();
Arrays.sort(chars);
String sortedString = new String (chars);
System.out.println("Sorted String" + sortedString);
return sortedString;
}

}



AnagramFinderReducer : It will read each member of group and combine them into single string separated by tab.


package com.hadoopgyaan;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class AnagramFinderReducer extends MapReduceBase implements Reducer<Text,Text,Text,IntWritable> {

public void reduce(Text key, Iterator<Text> values,OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {
int sum=1;
String similarWord = "";
while (values.hasNext()) {
 similarWord = similarWord + "\t" + values.next().toString();
 System.out.println( similarWord.toString());
}
output.collect(new Text( similarWord)  ,new IntWritable(sum));
}

}

AnagramFinderDriver :  Now for running the job, we will need a driver program which will create a new job instance, define proper mapper and reducer to the job. Driver program will also define the input and output directory which will be passed as argument.

package com.hadoopgyaan;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.lib.AnagramFinderMapper;
import org.apache.hadoop.mapred.lib.AnagramFinderReducer;
import org.apache.hadoop.mapred.TextOutputFormat;

public class AnagramFinderDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{
JobConf conf = new JobConf();
System.out.println("Execution Started");
conf.setJobName("AnagramFinder");
conf.setJarByClass(AnagramFinderRunner.class);
FileInputFormat.setInputPaths(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));

JobConf conf1 = new JobConf(false);
AnagramFinderMapper.addMapper(conf, SortKeyMapper.class, LongWritable.class, Text.class, Text.class, Text.class, true, conf1);
JobConf redConf =   new JobConf();
AnagramFinderReducer.setReducer(conf, CombineKeyReducer.class, Text.class, Text.class, Text.class, IntWritable.class, true, redConf);
JobConf mapConf =   new JobConf();
AnagramFinderReducer.addMapper(conf, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, NullWritable.class, true, mapConf);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(NullWritable.class);
conf.setOutputFormat((Class<? extends OutputFormat>) TextOutputFormat.class);
JobClient.runJob(conf);
}

}

Downloads :


I hope this tutorial will surely help you. If you have any questions or problems please let me know.

Happy Hadooping with Patrick..