Есть ли способ, с помощью которого каждый процесс-редуктор мог бы определить количество элементов или записей, которые он должен обработать?
Hadoop: количество входных записей для редуктора
- Вы пытаетесь получить количество значений, связанных с данным ключом в вашем классе редуктора? 20.12.2012
- @ryanbwork да. Кстати, у меня сложилось впечатление, что каждый редуктор будет работать с одним уникальным ключом и связанными с ним значениями, верно? 20.12.2012
- @ryanbwork Не вижу твоего ответа. 20.12.2012
- @ryanbwork это неправильно. Редуктор получит один уникальный ключ за один раз, но один редьюсер получит несколько ключей до завершения сопоставления/свертывания. 20.12.2012
- @ArnonRotem-Gal-Oz, спасибо за разъяснения; поэтому один экземпляр редуктора повторно используется для всех ключей, отправленных на данный узел редуктора? 20.12.2012
- @ryanbwork не совсем то, что узел может иметь несколько слотов редуктора, и данное задание карты / сокращения может использовать более одного слота на данном узле. Гарантируется, что один редьюсер получит все соответствующие данные (т.е. все, сгруппированные по ключу). 20.12.2012
- @ArnonRotem-Gal-Oz - слот == экземпляр? И если это так, то не означает ли это, что данный редьюсер может обрабатывать несколько наборов пар K/V (при условии, что # ключей › # слотов). 20.12.2012
- слот, если это потенциальный экземпляр. Hadoop решает, сколько слотов получит данное задание сопоставления/уменьшения (на основе количества необходимых редьюсеров, приоритета и т. д.). Кстати, то же самое касается и картографов. Слот, назначенный заданию, является экземпляром, и он может получать и обрабатывать несколько K/массивов значений (сгруппированных преобразователями/объединителями). 21.12.2012
- @ ArnonRotem-Gal-Oz еще раз спасибо за разъяснения, удалил мой предыдущий комментарий. 21.12.2012
Ответы:
Ваш класс редуктора должен расширять класс редуктора MapReducer:
Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
а затем должен реализовать метод сокращения, используя аргументы KEYIN/VALUEIN, указанные в расширенном классе Reduce.
reduce(KEYIN key, Iterable<VALUEIN> values,
org.apache.hadoop.mapreduce.Reducer.Context context)
Значения, связанные с данным ключом, могут быть подсчитаны с помощью
int count = 0;
Iterator<VALUEIN> it = values.iterator();
while(it.hasNext()){
it.Next();
count++;
}
Хотя я бы предложил сделать этот подсчет вместе с другой вашей обработкой, чтобы не делать два прохода через ваш набор значений.
РЕДАКТИРОВАТЬ
Вот пример вектора векторов, который будет динамически расти по мере добавления к нему (поэтому вам не придется статически объявлять свои массивы и, следовательно, не нужен размер набора значений). Это будет лучше всего работать для нерегулярных данных (т.е. количество столбцов не одинаково для каждой строки вашего входного CSV-файла), но будет иметь наибольшие накладные расходы.
Vector table = new Vector();
Iterator<Text> it = values.iterator();
while(it.hasNext()){
Text t = it.Next();
String[] cols = t.toString().split(",");
int i = 0;
Vector row = new Vector(); //new vector will be our row
while(StringUtils.isNotEmpty(cols[i])){
row.addElement(cols[i++]); //here were adding a new column for every value in the csv row
}
table.addElement(row);
}
Затем вы можете получить доступ к M-му столбцу N-й строки через
table.get(N).get(M);
Теперь, если бы вы знали, что количество столбцов будет установлено, вы могли бы изменить это, чтобы использовать вектор массивов, который, вероятно, был бы немного быстрее/эффективнее.
Короткий ответ - заранее нет, редуктор не знает, сколько значений поддерживается итерацией. Единственный способ сделать это - считать по мере повторения, но вы не можете повторно повторить итерацию снова.
Длинный ответ - поддержка итерации на самом деле представляет собой отсортированный массив байтов сериализованных пар ключ/значение. Редуктор имеет два компаратора: один для сортировки пар ключ/значение в ключевом порядке, второй для определения границы между ключами (известный как группировщик ключей). Обычно группировщик ключей совпадает с компаратором порядка ключей.
При переборе значений для определенного ключа базовый контекст проверяет следующий ключ в массиве и сравнивает с предыдущим ключом, используя компаратор группировки. Если компаратор определяет, что они равны, то итерация продолжается. В противном случае итерация для этого конкретного ключа заканчивается. Таким образом, вы можете видеть, что вы не можете заранее определить, какие значения будут переданы для любого конкретного ключа.
Вы можете увидеть это в действии, если создадите составной ключ, скажем, пару Text/IntWritable. Для метода compareTo отсортируйте сначала текст, а затем поле IntWritable. Затем создайте компаратор, который будет использоваться в качестве группового компаратора, который учитывает только текстовую часть ключа. Теперь, когда вы повторяете значения в редюсере, вы должны иметь возможность наблюдать IntWritable часть ключа, изменяющуюся с каждой итерацией.
Некоторый код, который я использовал ранее для демонстрации этого сценария, можно найти в этом pastebin.
Iterable
? 16.06.2014