четверг, 28 февраля 2013 г.

Экспорт Hadoop - MapReduce в MongoDB

Понадобилось направить в MongoDB выход хадуповского M/R .
Пришлось повозиться.

Используем MongoDB+Hadoop Connector, не забываем про avro без него не будет работать
<dependency>
   <groupId>org.mongodb</groupId>
   <artifactId>mongo-hadoop-core_cdh4b1</artifactId>
   <version>1.0.0</version>
</dependency>
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>avro</artifactId>
   <version>1.1.0</version>
</dependency>
1. MongoConfigUtil.setOutputURI - надо ставить до вызова   Job job = new Job(configuration, JOB_NAME); Иначе получаем ошибку связанную с невозможностью подключиться к Монге, дело в том что setOutputURI ставит параметры в configuration, которые Job копирует при создании.

2. Надо обязательно ставить выходной тип ключа для редьюса MongoConfigUtil.setOutputKey(configuration, Text.class);
иначе по умолчанию MongoConfigUtil ставит BSON, и становится тяжело понять, почему валится job

3. Указываем Output Format  job.setOutputFormatClass(MongoOutputFormat.class);
public static Job createJob(Configuration configuration,  String... paths)   throws IOException, JSONException {
        MongoConfigUtil.setOutputURI(
            configuration, "mongodb://mongo-stg-2:27102/buffer.jobout"
        );           
        MongoConfigUtil.setOutputKey(configuration, Text.class);
        Job job = new Job(configuration, JOB_NAME);
        job.setJarByClass(MongoWriterJob.class);
        job.setReducerClass(MongoWriterReducer.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(ResultJSONWritable.class);
        job.setOutputFormatClass(MongoOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DBObject.class);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        for (String path : paths) {
            FileInputFormat.addInputPath(job, new Path(path));
        }
        return job;
  }

Ну и наконец, reducer на выходе должен отдавать DBObject :
public static class MongoWriterReducer extends Reducer<
            ImmutableBytesWritable,
            ResultJSONWritable,
            Text,
            DBObject> {...


Комментариев нет:

Отправить комментарий