Monday, 27 June 2016

Apache Hadoop : NetFlix Data Analysis MapReduce Case Study



NetFlix Data Analysis example

This is sample hadoop application that counts overall average rating and first two weeks average rating after movie release from netflix's prize sample data. It uses Distributed Cache data join and custom RecordReader and InputFormat.


Python Script to getting data from website:


import sys, json, httplib, urllib, socket


if (len(sys.argv) != 3):
        sys.exit("Usage: %s <input filename> <output filename>" % sys.argv[0])


in_file = open(sys.argv[1], 'r')
out_file = open(sys.argv[2], 'a')

for line in in_file:
        tokens = line.strip().split(",")
        title = tokens[2]

        print "processing %s" % tokens[0]

        connection = httplib.HTTPConnection("www.imdbapi.com")
        query = "/?t=%s" % urllib.quote_plus(title)
        try:
                connection.request("GET", query)
        except socket.gaierror:
                print "Retrying to request..."

        response = connection.getresponse()

        try:
                json_data = json.loads(response.read())

                if 'Released' in json_data:
                        released = json_data['Released']
                else:
                        released = tokens[1]

        except ValueError:
                released = 'N/A'

        out_file.write("%s\n" % ",".join([tokens[0], released, tokens[2]]))


connection.close()


MappClass code:

package com.hadoopgyaan;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.text.ParseException;
import java.util.Date;
import java.util.Calendar;
import java.util.Hashtable;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

class MapClass extends Mapper<Text, Text, Text, Text> {
    static class MovieInfo {
        public String title;
        public String released;

        public MovieInfo(String title, String released) {
            this.title = title;
            this.released = released;
        }
    };

    private static final Log LOG = LogFactory.getLog(MapClass.class);

    private static final DateFormat moviesDf = new SimpleDateFormat("dd MMM yyyy");

    private Hashtable<Integer, MovieInfo> joinData =
        new Hashtable<Integer, MovieInfo>();

    private Text val = new Text();

    public void map(Text key, Text value, Context context)
            throws IOException, InterruptedException {

        String strValue = value.toString();

        if (!strValue.endsWith(":")) {
            String[] fields   = strValue.split(",");
            String rating     = fields[1];
            String ratingDate = fields[2];
            MovieInfo info    = joinData.get(movieIdFromFileName(key));

            if (info != null) {
                val.set(info.released + "," + rating + "," + ratingDate);
                context.write(new Text(info.title), val);
            }
        }
    }

    private Integer movieIdFromFileName(Text fileName) {
        String strKey = fileName.toString();
        return Integer.valueOf(strKey.replaceAll("mv_0+", "").
                replaceAll(".txt", ""));
    }

    @Override
    protected void setup(Context context) {
        Configuration conf = context.getConfiguration();
        Calendar calendar  = Calendar.getInstance();

        try {
            Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
            if (cacheFiles != null && cacheFiles.length > 0) {
                String line;
                String[] tokens;
                BufferedReader joinReader = new BufferedReader(
                        new FileReader(cacheFiles[0].toString()));
                try {
                    while ((line = joinReader.readLine()) != null) {
                        tokens = line.split(",");
                        try {
                            Date releaseDate = moviesDf.parse(tokens[1]);
                            calendar.setTime(releaseDate);
                            if (calendar.get(Calendar.YEAR) >= 2000) {
                                MovieInfo info = new MovieInfo(tokens[2],
                                                Netflix.ratingDateFormat.format(releaseDate));
                                joinData.put(new Integer(tokens[0]), info);
                            }
                        } catch (ParseException e) {}
                    }
                } finally {
                    joinReader.close();
                }
            }
        } catch (IOException e) {
            System.out.println("Exception reading distributed cache: " + e);
        }
    }
}

ReduceClass Code : 

package com.hadoopgyaan;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ReduceClass extends Reducer<Text, Text, Text, Text> {
    private static final Log LOG = LogFactory.getLog(ReduceClass.class);

    public void reduce(Text key, Iterable<Text> values,
                       Context context) throws IOException,
                                               InterruptedException {
        double sumFirstTwoWeeks = 0;
        int countFirstTwoWeeks = 0;
        double sumOverall = 0;
        int countOverall = 0;

        for(Text rating:values) {
            String[] tokens = rating.toString().split(",");
            int ratingValue = Integer.parseInt(tokens[1]);
            sumOverall   += ratingValue;
            countOverall += 1;
            try {
                Date movieReleaseDate = Netflix.ratingDateFormat.parse(tokens[0]);
                Date ratingDate = Netflix.ratingDateFormat.parse(tokens[2]);

                if (isLessThanTwoWeeks(movieReleaseDate, ratingDate)) {
                    sumFirstTwoWeeks   += ratingValue;
                    countFirstTwoWeeks += 1;
                }
            } catch (ParseException e ) {
                LOG.error("Unable to parse rating date");
            }
        }

        double avgOverall       = sumOverall / countOverall;
        double avgFirstTwoWeeks = sumFirstTwoWeeks / countFirstTwoWeeks;

        context.write(key, new Text(avgFirstTwoWeeks + "\t" + avgOverall));
    }

    private boolean isLessThanTwoWeeks(Date d1, Date d2) {
        long milliseconds = Math.abs(d1.getTime() - d2.getTime());
        return TimeUnit.MILLISECONDS.toDays(milliseconds) <= 14;
    }
}

Netflix Driver Code :

package com.hadoopgyaan;
import java.text.DateFormat;
import java.text.SimpleDateFormat;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Netflix extends Configured implements Tool {
    private static final String jobName = "netflix";

    private static final Log LOG = LogFactory.getLog(Netflix.class);

    public static final DateFormat ratingDateFormat =
        new SimpleDateFormat("yyyy-MM-dd");


    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Job job = new Job(conf, jobName);
        job.setJarByClass(Netflix.class);

        if (args.length < 2) {
            usage();
        }

        Path in = new Path(args[0]);
        Path out = new Path(args[1]);

        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setMapperClass(MapClass.class);
        job.setReducerClass(ReduceClass.class);

        job.setInputFormatClass(FileNameTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Netflix(), args);

        System.exit(res);
    }

    private void usage() {
        System.err.println("Usage: hadoop jar netflix.jar Netflix <in directory> <out directory>");
        System.exit(1);
    }

}

FileNameLineRecordReader and FileNameTextInputFormat

There is easier way to get file name from Map part, but I left these two classes as a sample of custom RecordReader and InputFormat. Maybe someone find it useful.

FileNameLineRecordReader Code

package com.hadoopgyaan;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

public class FileNameLineRecordReader extends RecordReader<Text, Text> {
    private static final Log LOG = LogFactory.getLog(FileNameLineRecordReader.class);

    private final LineRecordReader lineRecordReader;

    private Text innerValue;

    private Text key;

    private Text value;

    private Path splitPath;

    public Class getKeyClass() { return Text.class; }

    public FileNameLineRecordReader(Configuration conf)
        throws IOException {

        lineRecordReader = new LineRecordReader();
    }

    public void initialize(InputSplit genericSplit,
        TaskAttemptContext context) throws IOException {
        splitPath = ((FileSplit) genericSplit).getPath();
        lineRecordReader.initialize(genericSplit, context);
    }

    public static void setKeyValue(Text key, Text value,
            Text line, Path path) {
        key.set(path.getName());
        value.set(line);
    }

    /** Read key/value pair in a line. */
    public synchronized boolean nextKeyValue() throws IOException {
        String line;
        if (lineRecordReader.nextKeyValue()) {
            innerValue = lineRecordReader.getCurrentValue();
            line = innerValue.toString();
        } else {
            return false;
        }
        if (line == null)
            return false;
        if (key == null) {
            key = new Text();
        }
        if (value == null) {
            value = new Text();
        }
        setKeyValue(key, value, innerValue, splitPath);
        return true;
    }

    public Text getCurrentKey() {
        return key;
    }

    public Text getCurrentValue() {
        return value;
    }

    public float getProgress() throws IOException {
        return lineRecordReader.getProgress();
    }

    public synchronized void close() throws IOException {
        lineRecordReader.close();
    }
}


FileNameTextInputFormat Code :

package com.hadoopgyaan;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;

public class FileNameTextInputFormat extends FileInputFormat<Text, Text> {

    protected boolean isSplitable(JobContext context, Path file) {
        final CompressionCodec codec =
            new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
        return codec == null;
    }

    public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
        TaskAttemptContext context) throws IOException {

        context.setStatus(genericSplit.toString());
        return new FileNameLineRecordReader(context.getConfiguration());
    }


}

Downloads :


I hope this tutorial will surely help you. If you have any questions or problems please let me know.

Happy Hadooping with Patrick..

2 comments:

  1. great article thanks

    ReplyDelete
    Replies
    1. Thanks for liking my post. keep checking this blog you'll find new case studies in future also.

      Delete