четверг, 28 февраля 2013 г.

Экспорт Hadoop - MapReduce в MongoDB

Понадобилось направить в MongoDB выход хадуповского M/R .
Пришлось повозиться.

Используем MongoDB+Hadoop Connector, не забываем про avro без него не будет работать
<dependency>
   <groupId>org.mongodb</groupId>
   <artifactId>mongo-hadoop-core_cdh4b1</artifactId>
   <version>1.0.0</version>
</dependency>
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>avro</artifactId>
   <version>1.1.0</version>
</dependency>
1. MongoConfigUtil.setOutputURI - надо ставить до вызова   Job job = new Job(configuration, JOB_NAME); Иначе получаем ошибку связанную с невозможностью подключиться к Монге, дело в том что setOutputURI ставит параметры в configuration, которые Job копирует при создании.

2. Надо обязательно ставить выходной тип ключа для редьюса MongoConfigUtil.setOutputKey(configuration, Text.class);
иначе по умолчанию MongoConfigUtil ставит BSON, и становится тяжело понять, почему валится job

3. Указываем Output Format  job.setOutputFormatClass(MongoOutputFormat.class);
public static Job createJob(Configuration configuration,  String... paths)   throws IOException, JSONException {
        MongoConfigUtil.setOutputURI(
            configuration, "mongodb://mongo-stg-2:27102/buffer.jobout"
        );           
        MongoConfigUtil.setOutputKey(configuration, Text.class);
        Job job = new Job(configuration, JOB_NAME);
        job.setJarByClass(MongoWriterJob.class);
        job.setReducerClass(MongoWriterReducer.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(ResultJSONWritable.class);
        job.setOutputFormatClass(MongoOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DBObject.class);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        for (String path : paths) {
            FileInputFormat.addInputPath(job, new Path(path));
        }
        return job;
  }

Ну и наконец, reducer на выходе должен отдавать DBObject :
public static class MongoWriterReducer extends Reducer<
            ImmutableBytesWritable,
            ResultJSONWritable,
            Text,
            DBObject> {...


пятница, 22 февраля 2013 г.

MongoDB map-reduce, интересные нюансы

Задача:
Необходимо сгруппировать объекты по атрибуту source_name, и посчитать количество таких объектов для каждой группы.

Решение:
В MongoDB есть Aggregation framework, он позволяет в том числе запускать m/r задачи.
Надо сказать что в Монге какой то странный мэп-редьюс. Во первых он однопоточный, что противоречит самой парадигме m/r, а во вторых с редьюсами происходит что то не понятное, что запутывает окончательно.

Первое решение (логичное, но не правильное) было таким:
Всё как обычно, каждому ключу ставим значение 1, в редьюсе подсчитываем их количество
db.buffer.mapReduce(
    "function() {
         emit(this.source_name, 1);
    }",
    "function(k, vals) {
         var count = vals.length;
         return {count: count};
    }",
    {out: "buffer_results"}
);
out:
{
    "result" : "buffer_results",
    "timeMillis" : 70258,
    "counts" : {
        "input" : 2803200,
        "emit" : 2803200,
        "reduce" : 166542,
        "output" : 13
     },
   "ok" : 1,
}

{ "_id" : "source_A", "value" : { "count" : 2601 } }
{ "_id" : "source_B", "value" : { "count" : 50456 } }
{ "_id" : "source_C", "value" : { "count" : 23422 } }
{ "_id" : "source_D", "value" : { "count" : 725523 } }
{ "_id" : "source_E", "value" : { "count" : 234 } }
{ "_id" : "source_F", "value" : { "count" : 123111 } }
{ "_id" : "source_G", "value" : { "count" : 4 } }
{ "_id" : "source_H", "value" : { "count" : 1 } }
{ "_id" : "source_I", "value" : { "count" : 1 } }
{ "_id" : "source_J", "value" : { "count" : 7363 } }
{ "_id" : "source_K", "value" : { "count" : 560 } }
{ "_id" : "source_L", "value" : { "count" : 10 } }
Если суммировать все count, должно получиться  2803200, но не получается. Такое впечатление, что это результат какого то не полного редьюса.

И действительно,  выясняется интересный нюанс. Редьюс может быть несколько раз вызван для одного и того же ключа. Невероятно, но факт! Это связанно с архитектурой Монги, m/r может выполняться на кластере,  документы с одним и тем же ключом могут находится на разных серверах. Сначала редьюс по ключу выполняется на одном сервере, а затем на другом.
После чего происходит финальный редьюс, в который попадают результаты выполнения первых двух. Т.е входом в редьюс могут быть как значения выхода мэпа, так и редьюса.



Теперь всё встало на свои места. В сложившейся ситуации, код m/r должен быть таким:
db.buffer.mapReduce(
   "function() {
       emit(this.source_name, {count: 1});
   }",
   "function(key, values) {
      var count = 0; 
      values.forEach(function(v) {count += v['count'];}); 
      return {count: count};
   }", 
   {out: "buffer_results"}
);
Мэп и редьюс генерируют одинаковые объекты на выходе. Поэтому редьюсу без разницы, от кого пришли данные.
out:
{
    "result" : "buffer_results",
    "timeMillis" : 70258,
    "counts" : {
            "input" : 2803200,
                "emit" : 2803200,
            "reduce" : 166542,
            "output" : 13
    },
    "ok" : 1,
}

{ "_id" : "source_A", "value" : { "count" : 2601 } }

{ "_id" : "source_B", "value" : { "count" : 1380432 } }
{ "_id" : "source_C", "value" : { "count" : 106087 } }
{ "_id" : "source_D", "value" : { "count" : 854294 } }
{ "_id" : "source_E", "value" : { "count" : 100509 } }
{ "_id" : "source_F", "value" : { "count" : 151713 } }
{ "_id" : "source_G", "value" : { "count" : 4 } }
{ "_id" : "source_H", "value" : { "count" : 1 } }
{ "_id" : "source_I", "value" : { "count" : 1 } }
{ "_id" : "source_J", "value" : { "count" : 72363 } }
{ "_id" : "source_K", "value" : { "count" : 2560 } }
{ "_id" : "source_L", "value" : { "count" : 10 } }

В итоге получаем правильные результаты. Дебит сходится с кредитом.






Фтопку !

Здесь буду перечислять всякие неприятности, о которых стоит помнить в следующий раз...

-------------------------------------------------------------------------------------------
au.com.bytecode.opencsv.CSVReader в топку !

Жутко заглючил у заказчика.
Если от парсера не требуется мэппинг в POJO, то лучше свой написать.
Всего то 10 строк кода.
-------------------------------------------------------------------------------------------

среда, 20 февраля 2013 г.

Интеграция Hive c HBase (грабли)

Hive - замечательная вещь. Про то, что это и зачем это, написано уже не мало.
В основном все примеры описывают, как делать запросы по хадуповским файлам.
А вот как делать HQL запросы в HBase - информации мало.

Т.к без слёз делать выборки в HBase не получается, пришлось прикручивать Hive

Порядок действий прост.
Создаём в консоли Hive, таблицу связанную с таблицей в HBase, настраиваем мэппинг полей
hive> CREATE EXTERNAL TABLE hive_raw_data(key string,  dmp_id string,  referrer string,  source_id string,  source_name string)
    > STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    > WITH SERDEPROPERTIES ("hbase.columns.mapping" = "data:dmp_id,  data:referrer,  data:source_id,  data:source_name")      
    > TBLPROPERTIES("hbase.table.name" = "raw_data");

Ууупс, первые грабли. Система не внятно ругается, и никак не создает таблицу. Понять в чем дело - трудно. Оказывается при описании мэппинга ("hbase.columns.mapping" = "data:dmp_id,  data:referrer,  data:source_id,  data:source_name"), не должно быть пробелов между полями! Должно быть вот так: "hbase.columns.mapping" = "data:dmp_id,data:referrer,data:source_id,data:source_name"

Исправляем, и... таблица создана.
запускаем тест select * from hive_row_data и видим что всё работает.
Но радоваться рано, простой select запускает обычный перебор, а как только запрос становится чуть сложнее например select * from hive_row_data where key = 'xxx' вот и начинает всё валится (ибо запускается механизм Map - reduce).
В итоге получаем исключение, вот такого плана:
Exception in thread "Thread-48" java.lang.NullPointerException
    at org.apache.hadoop.hive.shims.Hadoop23Shims.getTaskAttemptLogUrl(Hadoop23Shims.java:44)
    at  org.apache.hadoop.hive.ql.exec.JobDebugger$TaskInfoGrabber.getTaskInfos
    (JobDebugger.java:186)
    at org.apache.hadoop.hive.ql.exec.JobDebugger$TaskInfoGrabber.run(JobDebugger.java:142)
    at java.lang.Thread.run(Thread.java:662) 
На самом деле, те исключения которые валятся в консоль, можно игнорировать. Смотреть надо лог джоба. Залазим - смотрим:
INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201302201311_0003_m_000000_3: java.io.IOException: Cannot create an instance of InputSplit class = org.apache.hadoop.hive.hbase.HBaseSplit:Class     org.apache.hadoop.hive.hbase.HBaseSplit not found at       org.apache.hadoop.hive.ql.io.HiveInputFormat$HiveInputSplit.readFields(HiveInputFormat.java:146)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:73)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:44)
at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:351)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:367)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:327)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
at org.apache.hadoop.mapred.Child.main(Child.java:2
Это значит что map-reduce задачи, запускаемые Hive-ом ничего не знают про HBase, и просто не могут найти нужные jar. Беда в том, что эти библиотеки нужны на всех нодах.

Решение здесь такое:
1. Сохраняем нужные Jar в HDFS (/user/hive/)
hadoop dfs -put /usr/lib/hive/lib/hbase-VERSION.jar /user/hive/hbase-VERSION.jar
hadoop dfs -put /usr/lib/hive/lib/hive-hbase-handler-VERSION.jar /user/hive/hive-hbase-handler-VERSION.jar

в моем случае это выглядит так:

hadoop dfs -put /usr/lib/hive/lib/hive-hbase-handler-0.9.0-cdh4.1.3.jar /user/hive/hive-hbase-handler-0.9.0-cdh4.1.3.jar
hadoop dfs -put /usr/lib/hive/lib/hbase-0.92.1-cdh4.1.3-security.jar /user/hive/hbase-0.92.1-cdh4.1.3-security.jar

Затем правим hive-site.xml:
<property>
    <name>hive.aux.jars.path</name>
    <value>/user/hive/hive-hbase-handler-0.9.0-cdh4.1.3.jar,/user/hive/hbase-0.92.1-cdh4.1.3-security.jarN.jar</value>
</property>
Вот и всё. После этого Hive начинает исправно выполнять сложные выборки.
Работаем и наслаждаемся.

https://cwiki.apache.org/Hive/hbasebulkload.html





суббота, 16 февраля 2013 г.

Hadoop - Слоники большие и маленькие

Hadoop в последнее время шагает в массы. Вот и мне довелось с ним работать.
Отличная система, но грабельки большие и маленькие всё таки там раскиданы.
Вроде бы мелочи, но на решение каждой уходит по пол дня.

И так, во первых Hadoop бывает в нескольких дистрибутивных исполнениях:

1. Apache, как есть http://hadoop.apache.org/releases.html
Hadoop в чистом виде. Ставить на всех нодах надо руками, править конфиги. Учить его видеть HBase. В общем надо с головой уходить в администрирование. Свои плюсы в этом конечно есть, но где взять столько времени ?

2. Cloudera http://www.cloudera.com
Половина, из создателей Hadoop организовала свою кантору и свой дистрибутив.
Главная его изюминка - это ClouderaManager, с помощью которого можно очень легко управлять кластером, мониторить, добавлять ноды. Ставиться всё в два клика, система разворачивается на кластере, всё нужное в конфигах прописывается автоматически. В общем прелесть.
Это коммерческая система, но есть бесплатная версия, с ограничением в 50 нод на кластер.

Так выглядит Web консоль ClouderaManager


3. MapR http://www.mapr.com/
Другая половина создателей Hadoop решала сделать свой лунопарк, и выпустила свой дистрибутив. Всё как в Cloudera или в Cloudera всё как в MapR. Плюс комунити, форум, и отсутствие ограничений на размер кластера. Вобщем тоже красота.

MapR я не пробовал, а начал работать с Cloudera, по этому о всех изысках по порядку:

1. У Cloudera свой мавновский репозиторий. 
И необходимо использовать именно их зависимости в купе с версиями системы на вашем кластере
---------------------------------------------------------------------------
 <repositories>
     <repository>
         <id>apache</id>
         <url>https://repository.apache.org/content/repositories/releases/
         </url>
     </repository>
     <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/
        </url>
    </repository>
</repositories>
---------------------------------------------------------------------------

<dependency>
     <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.0.0-mr1-cdh4.1.2</version>
      <scope>provided</scope>
</dependency>
---------------------------------------------------------------------------

2. Минимизация Super Jar
Я привык собирать super jar. Это удобно, все нужные зависимости находятся уже внутри.
Таким способом можно избежать возможных проблем в продакшене.
Но как оказалось, для map - reduce такой подход опасен. Когда в jar включаются клоудеровские зависимости, в run time начинается какой то бардак. Такое впечатление что вместо одних методов начинают вызываются совсем другие. Поэтому из конечного архива пришлось исключить все клоудеровские зависимости. Сделать это можно просто, добавив ключ  <scope>provided</scope> к мавновской зависимости. После этого m/r работает как надо

Но если надо работать с HBase, то следует полдностью включить org.apache.hbase
Иначе хадуп просто не видит этих библиотек

3. Map-reduce значения по умолчанию
Очень важно непосредственно определять входные и выходные форматы Job-а, т.к по умолчанию они всегда текстовые.

job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);

Тоже самое касается форматов входа и выхода Mapper-ов и Reducer-ов. Особенно болезненно это проявляется в задачах со сцепленными Job-ми

job.setOutputKeyClass(ReportReduceWritable.class);   //Отдельно для Reduce
job.setOutputValueClass(MyKeyWritable.class);

job.setMapOutputKeyClass(Text.class);                           //Отдельно для Map
job.setMapOutputValueClass(MyReduceWritable.class);

4. Удаление директорий
Перед запуском m/r задачи надо удалять все выходные директории, которые были созданы после предыдущего запуска. Нначе Job будет ругаться и вываливаться.

5. Итератор Reducer-a нельзя использовать повторно
Интересная грабелька. Когда мы имплементируем org.apache.hadoop.mapreduce.Reducer
на вход приходит некий итератор Iterable<MyValue> values. Он позволяет считать данные только один раз. После этого перевести курсор в начало не возможно. Приходится выкручиваться сохранением элементов во временную коллекцию.


6. Запуск Job от имени другого пользователя и TableMapReduceUtil.initTableMapperJob
Убили на эти грабли день, прежде чем понять в чем дело.
Запускаем Job по крону от имени специального - системного пользователя
dmpservice. Валится !

java.io.IOException: java.lang.RuntimeException: java.io.IOException: Permission denied
at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.findOrCreateJar(TableMapReduceUtil.java:521)
at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(TableMapReduceUtil.java:472)
at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(TableMapReduceUtil.java:438)
at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:138)
at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:215)
at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:81)
at ru.crystaldata.analyticengine.mr.HBaseReaderJob.createJob(HBaseReaderJob.java:206)
at ru.crystaldata.analyticengine.JobsRunner.main(JobsRunner.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:208)
Caused by: java.lang.RuntimeException: java.io.IOException: Permission denied
at org.apache.hadoop.util.JarFinder.getJar(JarFinder.java:164)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.findOrCreateJar(TableMapReduceUtil.java:518)
... 12 more
Caused by: java.io.IOException: Permission denied
at java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.io.File.checkAndCreate(File.java:1704)
at java.io.File.createTempFile(File.java:1792)
at org.apache.hadoop.util.JarFinder.getJar(JarFinder.java:156)
... 17 more

Делаем всё тоже самое но от обычного пользователя - работает.
Убираем из джоба HBase - мепперы, запускаем от dmpservice - работает.
Вывод: что то темное творится в недрах TableMapReduceUtil.initTableMapperJob

Как оказалось TableMapReduceUtil.initTableMapperJob создает темповый файл для своих внутренних нужд, и пытается разместить его в HOME, а так как у пользователя dmpservice,
не заданна данная переменная, то запись производится в root /, на который естественно нет прав.

Лечится - при создании задачи крону, прописываем HOME=var/tmp


Пока это всё. Удачи в начинаниях :)