Пришлось повозиться.
Используем MongoDB+Hadoop Connector, не забываем про avro без него не будет работать
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-hadoop-core_cdh4b1</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>avro</artifactId>
<version>1.1.0</version>
</dependency>
1. MongoConfigUtil.setOutputURI - надо ставить до вызова Job job = new Job(configuration, JOB_NAME);
Иначе получаем ошибку связанную с невозможностью подключиться к Монге, дело в том что setOutputURI ставит параметры в configuration, которые Job копирует при создании.<groupId>org.mongodb</groupId>
<artifactId>mongo-hadoop-core_cdh4b1</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>avro</artifactId>
<version>1.1.0</version>
</dependency>
2. Надо обязательно ставить выходной тип ключа для редьюса MongoConfigUtil.setOutputKey(configuration, Text.class);
иначе по умолчанию MongoConfigUtil ставит BSON, и становится тяжело понять, почему валится job
3. Указываем Output Format job.setOutputFormatClass(MongoOutputFormat.class);
public static Job createJob(Configuration configuration, String... paths) throws IOException, JSONException {
MongoConfigUtil.setOutputURI(
configuration, "mongodb://mongo-stg-2:27102/buffer.jobout"
);
MongoConfigUtil.setOutputKey(configuration, Text.class);
Job job = new Job(configuration, JOB_NAME);
job.setJarByClass(MongoWriterJob.class);
job.setReducerClass(MongoWriterReducer.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(ResultJSONWritable.class);
job.setOutputFormatClass(MongoOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DBObject.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
for (String path : paths) {
FileInputFormat.addInputPath(job, new Path(path));
}
return job;
}
MongoConfigUtil.setOutputURI(
configuration, "mongodb://mongo-stg-2:27102/buffer.jobout"
);
MongoConfigUtil.setOutputKey(configuration, Text.class);
Job job = new Job(configuration, JOB_NAME);
job.setJarByClass(MongoWriterJob.class);
job.setReducerClass(MongoWriterReducer.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(ResultJSONWritable.class);
job.setOutputFormatClass(MongoOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DBObject.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
for (String path : paths) {
FileInputFormat.addInputPath(job, new Path(path));
}
return job;
}
Ну и наконец, reducer на выходе должен отдавать DBObject :
public static class MongoWriterReducer extends Reducer<
ImmutableBytesWritable,
ResultJSONWritable,
Text,
DBObject> {...
ImmutableBytesWritable,
ResultJSONWritable,
Text,
DBObject> {...
Комментариев нет:
Отправить комментарий