Software Engineering for Smart Data Analytics & Smart Data Analytics for Software Engineering
MapReduce
package mapreduce; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class MapReduce { public interface Function<I, O> { public O apply(I input); } /** * Applies the function fun in parallel to all inputs collecting the outputs * in the returned list. * * @throws ExecutionException * @throws InterruptedException */ static public <I, O> List<O> mapInParallel(final Function<I, O> fun, List<I> inputs) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(8); List<Future<O>> futures = new ArrayList<Future<O>>(); for (final I input : inputs) { Callable<O> worker = new Callable<O>() { @Override public O call() throws Exception { return fun.apply(input); } }; Future<O> future = executor.submit(worker); futures.add(future); } List<O> outputs = new ArrayList<O>(inputs.size()); for (Future<O> future : futures) { outputs.add(future.get()); } executor.shutdown(); return outputs; } /** * Applies the function fun to all inputs collecting the outputs in the * returned list. */ static public <I, O> List<O> map(final Function<I, O> fun, List<I> inputs) { List<O> outputs = new ArrayList<O>(inputs.size()); for (I input : inputs) { outputs.add(fun.apply(input)); } return outputs; } public interface Combination<E, R> { public R combine(R partial, E element); } /** * Combines the elements of the list one by one to a single element using * the combination function com. */ static public <E, R> R reduce(Combination<E, R> com, List<E> elements) { R partial = null; if (elements == null) { return com.combine(partial, null); } for (E element : elements) { partial = com.combine(partial, element); } return partial; } }
TermFrequency
package mapreduce; import static mapreduce.MapReduce.mapInParallel; import static mapreduce.MapReduce.reduce; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Scanner; import java.util.Set; import java.util.concurrent.ExecutionException; import mapreduce.MapReduce.Combination; import mapreduce.MapReduce.Function; public class TermFrequency { static class CountOnceEntry implements Entry<String, Integer> { private String key; public CountOnceEntry(String key) { this.key = key; } @Override public String getKey() { return key; } @Override public Integer getValue() { return 1; } @Override public Integer setValue(Integer ignored) { return 1; } @Override public String toString() { return "[key = " + key + ", value = 1]"; } } /** * Takes a string data and splits it into chunks of at least the size * minChunkSize. The method makes sure that the split does not happen within * a word. */ static List<String> partition(String data, int minChunkSize) { int length = data.length(); List<String> result = new ArrayList<String>(length / minChunkSize); int previousEnd = 0; while (previousEnd < length) { int nextEnd = previousEnd + minChunkSize; while (nextEnd < length && Character.isAlphabetic(data.charAt(nextEnd))) nextEnd++; if (nextEnd >= length) nextEnd = length; result.add(data.substring(previousEnd, nextEnd)); previousEnd = nextEnd; } return result; } static Set<String> stopWords = null; private static String readFile(String pathToFile) throws IOException { File file = new File(pathToFile); FileInputStream stream = null; stream = new FileInputStream(file); byte[] rawData = new byte[(int) file.length()]; stream.read(rawData); stream.close(); return new String(rawData); } private static Set<String> readStopWords(String pathToFile) throws FileNotFoundException, IOException { Set<String> stopWords = new HashSet<String>(); for (String stopWord : readFile(pathToFile).split(",")) { stopWords.add(stopWord); } for (char c = 'a'; c <= 'z'; c++) { stopWords.add(Character.toString(c)); } return stopWords; } private static <K, V extends Comparable<V>> List<Entry<K, V>> sort(Map<K, V> map) { List<Entry<K, V>> result = new ArrayList<Entry<K, V>>(); result.addAll(map.entrySet()); Comparator<Entry<K, V>> comparator = new Comparator<Entry<K, V>>() { public int compare(Entry<K, V> left, Entry<K, V> right) { return -left.getValue().compareTo(right.getValue()); } }; Collections.sort(result, comparator); return result; } static Function<String, List<Entry<String, Integer>>> splitWords = // new Function<String, List<Entry<String, Integer>>>() { @Override public List<Entry<String, Integer>> apply(String input) { List<Entry<String, Integer>> result = new ArrayList<Entry<String, Integer>>(); Scanner scanner = new Scanner(input.toLowerCase()); scanner.useDelimiter("[^a-z]+"); while (scanner.hasNext()) { String word = scanner.next(); if (!stopWords.contains(word)) result.add(new CountOnceEntry(word)); } scanner.close(); return result; } }; static Combination<List<Entry<String, Integer>>, Map<String, Integer>> countWords = // new Combination<List<Entry<String, Integer>>, Map<String, Integer>>() { @Override public Map<String, Integer> combine(Map<String, Integer> partial, List<Entry<String, Integer>> element) { if (partial == null) { partial = new HashMap<String, Integer>(); } if (element == null){ return partial; } for (Entry<String, Integer> entry : element) { String key = entry.getKey(); Integer value = entry.getValue(); Integer count = partial.get(key); partial.put(key, (count == null) ? value : count + value); } return partial; } }; public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { stopWords = readStopWords("stop_words.txt"); List<String> partitions = partition(readFile("pride-and-prejudice.txt"), 200*80); List<List<Entry<String, Integer>>> splits = mapInParallel(splitWords, partitions); Map<String, Integer> wordFrequencies = reduce(countWords, splits); Iterator<Entry<String, Integer>> it = sort(wordFrequencies).iterator(); for (int printed = 0; it.hasNext() && (printed < 25); printed++) { Entry<String, Integer> frequency = it.next(); System.out.println("" + frequency.getKey() + " - " + frequency.getValue()); } } }