SDA SE Wiki

Software Engineering for Smart Data Analytics & Smart Data Analytics for Software Engineering

User Tools

Site Tools


Map Reduce Style

MapReduce

package mapreduce;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class MapReduce {

	public interface Function<I, O> {
		public O apply(I input);
	}

	/**
	 * Applies the function fun in parallel to all inputs collecting the outputs
	 * in the returned list.
	 * 
	 * @throws ExecutionException
	 * @throws InterruptedException
	 */
	static public <I, O> List<O> mapInParallel(final Function<I, O> fun, List<I> inputs) throws InterruptedException,
			ExecutionException {

		ExecutorService executor = Executors.newFixedThreadPool(8);
		List<Future<O>> futures = new ArrayList<Future<O>>();
		for (final I input : inputs) {
			Callable<O> worker = new Callable<O>() {

				@Override
				public O call() throws Exception {
					return fun.apply(input);
				}

			};
			Future<O> future = executor.submit(worker);
			futures.add(future);
		}

		List<O> outputs = new ArrayList<O>(inputs.size());
		for (Future<O> future : futures) {
			outputs.add(future.get());
		}
		executor.shutdown();

		return outputs;
	}

	/**
	 * Applies the function fun to all inputs collecting the outputs in the
	 * returned list.
	 */
	static public <I, O> List<O> map(final Function<I, O> fun, List<I> inputs) {
		List<O> outputs = new ArrayList<O>(inputs.size());
		for (I input : inputs) {
			outputs.add(fun.apply(input));
		}
		return outputs;
	}

	
	public interface Combination<E, R> {
		public R combine(R partial, E element);
	}

	/**
	 * Combines the elements of the list one by one to a single element using
	 * the combination function com.
	 */
	static public <E, R> R reduce(Combination<E, R> com, List<E> elements) {
		R partial = null;
		if (elements == null) {
			return com.combine(partial, null);
		}
		for (E element : elements) {
			partial = com.combine(partial, element);
		}
		return partial;
	}

}

TermFrequency

package mapreduce;

import static mapreduce.MapReduce.mapInParallel;
import static mapreduce.MapReduce.reduce;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import mapreduce.MapReduce.Combination;
import mapreduce.MapReduce.Function;

public class TermFrequency {

	static class CountOnceEntry implements Entry<String, Integer> {
	
		private String key;
	
		public CountOnceEntry(String key) {
			this.key = key;
		}
	
		@Override
		public String getKey() {
			return key;
		}
	
		@Override
		public Integer getValue() {
			return 1;
		}
	
		@Override
		public Integer setValue(Integer ignored) {
			return 1;
		}
		
		@Override
		public String toString() {
			return "[key = " + key + ", value = 1]";
		}
	
	}

	/**
	 * Takes a string data and splits it into chunks of at least the size
	 * minChunkSize. The method makes sure that the split does not happen within
	 * a word.
	 */
	static List<String> partition(String data, int minChunkSize) {
		int length = data.length();
		List<String> result = new ArrayList<String>(length / minChunkSize);
		int previousEnd = 0;
		while (previousEnd < length) {
			int nextEnd = previousEnd + minChunkSize;
			while (nextEnd < length && Character.isAlphabetic(data.charAt(nextEnd)))
				nextEnd++;
			if (nextEnd >= length)
				nextEnd = length;
			result.add(data.substring(previousEnd, nextEnd));
			previousEnd = nextEnd;
		}
		return result;
	}

	static Set<String> stopWords = null;

	private static String readFile(String pathToFile) throws IOException {
		File file = new File(pathToFile);
		FileInputStream stream = null;
		stream = new FileInputStream(file);
		byte[] rawData = new byte[(int) file.length()];
		stream.read(rawData);
		stream.close();
		return new String(rawData);
	}

	private static Set<String> readStopWords(String pathToFile) throws FileNotFoundException, IOException {
		Set<String> stopWords = new HashSet<String>();
		for (String stopWord : readFile(pathToFile).split(",")) {
			stopWords.add(stopWord);
		}
		for (char c = 'a'; c <= 'z'; c++) {
			stopWords.add(Character.toString(c));
		}
		return stopWords;
	}

	private static <K, V extends Comparable<V>> List<Entry<K, V>> sort(Map<K, V> map) {
		List<Entry<K, V>> result = new ArrayList<Entry<K, V>>();
		result.addAll(map.entrySet());
		Comparator<Entry<K, V>> comparator = new Comparator<Entry<K, V>>() {
			public int compare(Entry<K, V> left, Entry<K, V> right) {
				return -left.getValue().compareTo(right.getValue());
			}
		};
		Collections.sort(result, comparator);
		return result;
	}

	static Function<String, List<Entry<String, Integer>>> splitWords = //
	new Function<String, List<Entry<String, Integer>>>() {

		@Override
		public List<Entry<String, Integer>> apply(String input) {

			List<Entry<String, Integer>> result = new ArrayList<Entry<String, Integer>>();
	
			Scanner scanner = new Scanner(input.toLowerCase());
			scanner.useDelimiter("[^a-z]+");
			while (scanner.hasNext()) {
				String word = scanner.next();
				if (!stopWords.contains(word))
					result.add(new CountOnceEntry(word));
			}
			scanner.close();

			return result;
		}
	};

	static Combination<List<Entry<String, Integer>>, Map<String, Integer>> countWords = //
	new Combination<List<Entry<String, Integer>>, Map<String, Integer>>() {

		@Override
		public Map<String, Integer> combine(Map<String, Integer> partial, List<Entry<String, Integer>> element) {
			if (partial == null) {
				partial = new HashMap<String, Integer>();
			}
			if (element == null){
				return partial;
			}
			for (Entry<String, Integer> entry : element) {
				String key = entry.getKey();
				Integer value = entry.getValue();
				Integer count = partial.get(key);
				partial.put(key, (count == null) ? value : count + value);
			}
			return partial;
		}
	};

	public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {

		stopWords = readStopWords("stop_words.txt");
		List<String> partitions = partition(readFile("pride-and-prejudice.txt"), 200*80);

		List<List<Entry<String, Integer>>> splits = mapInParallel(splitWords, partitions);
		Map<String, Integer> wordFrequencies = reduce(countWords, splits);

		Iterator<Entry<String, Integer>> it = sort(wordFrequencies).iterator();
		for (int printed = 0; it.hasNext() && (printed < 25); printed++) {
			Entry<String, Integer> frequency = it.next();
			System.out.println("" + frequency.getKey() + " - " + frequency.getValue());
		}

	}

}
teaching/seminars/style/2014/map_reduce.txt · Last modified: 2018/05/24 15:14 by daniel

SEWiki, © 2024