пятница, 22 февраля 2013 г.

MongoDB map-reduce, интересные нюансы

Задача:
Необходимо сгруппировать объекты по атрибуту source_name, и посчитать количество таких объектов для каждой группы.

Решение:
В MongoDB есть Aggregation framework, он позволяет в том числе запускать m/r задачи.
Надо сказать что в Монге какой то странный мэп-редьюс. Во первых он однопоточный, что противоречит самой парадигме m/r, а во вторых с редьюсами происходит что то не понятное, что запутывает окончательно.

Первое решение (логичное, но не правильное) было таким:
Всё как обычно, каждому ключу ставим значение 1, в редьюсе подсчитываем их количество
db.buffer.mapReduce(
    "function() {
         emit(this.source_name, 1);
    }",
    "function(k, vals) {
         var count = vals.length;
         return {count: count};
    }",
    {out: "buffer_results"}
);
out:
{
    "result" : "buffer_results",
    "timeMillis" : 70258,
    "counts" : {
        "input" : 2803200,
        "emit" : 2803200,
        "reduce" : 166542,
        "output" : 13
     },
   "ok" : 1,
}

{ "_id" : "source_A", "value" : { "count" : 2601 } }
{ "_id" : "source_B", "value" : { "count" : 50456 } }
{ "_id" : "source_C", "value" : { "count" : 23422 } }
{ "_id" : "source_D", "value" : { "count" : 725523 } }
{ "_id" : "source_E", "value" : { "count" : 234 } }
{ "_id" : "source_F", "value" : { "count" : 123111 } }
{ "_id" : "source_G", "value" : { "count" : 4 } }
{ "_id" : "source_H", "value" : { "count" : 1 } }
{ "_id" : "source_I", "value" : { "count" : 1 } }
{ "_id" : "source_J", "value" : { "count" : 7363 } }
{ "_id" : "source_K", "value" : { "count" : 560 } }
{ "_id" : "source_L", "value" : { "count" : 10 } }
Если суммировать все count, должно получиться  2803200, но не получается. Такое впечатление, что это результат какого то не полного редьюса.

И действительно,  выясняется интересный нюанс. Редьюс может быть несколько раз вызван для одного и того же ключа. Невероятно, но факт! Это связанно с архитектурой Монги, m/r может выполняться на кластере,  документы с одним и тем же ключом могут находится на разных серверах. Сначала редьюс по ключу выполняется на одном сервере, а затем на другом.
После чего происходит финальный редьюс, в который попадают результаты выполнения первых двух. Т.е входом в редьюс могут быть как значения выхода мэпа, так и редьюса.



Теперь всё встало на свои места. В сложившейся ситуации, код m/r должен быть таким:
db.buffer.mapReduce(
   "function() {
       emit(this.source_name, {count: 1});
   }",
   "function(key, values) {
      var count = 0; 
      values.forEach(function(v) {count += v['count'];}); 
      return {count: count};
   }", 
   {out: "buffer_results"}
);
Мэп и редьюс генерируют одинаковые объекты на выходе. Поэтому редьюсу без разницы, от кого пришли данные.
out:
{
    "result" : "buffer_results",
    "timeMillis" : 70258,
    "counts" : {
            "input" : 2803200,
                "emit" : 2803200,
            "reduce" : 166542,
            "output" : 13
    },
    "ok" : 1,
}

{ "_id" : "source_A", "value" : { "count" : 2601 } }

{ "_id" : "source_B", "value" : { "count" : 1380432 } }
{ "_id" : "source_C", "value" : { "count" : 106087 } }
{ "_id" : "source_D", "value" : { "count" : 854294 } }
{ "_id" : "source_E", "value" : { "count" : 100509 } }
{ "_id" : "source_F", "value" : { "count" : 151713 } }
{ "_id" : "source_G", "value" : { "count" : 4 } }
{ "_id" : "source_H", "value" : { "count" : 1 } }
{ "_id" : "source_I", "value" : { "count" : 1 } }
{ "_id" : "source_J", "value" : { "count" : 72363 } }
{ "_id" : "source_K", "value" : { "count" : 2560 } }
{ "_id" : "source_L", "value" : { "count" : 10 } }

В итоге получаем правильные результаты. Дебит сходится с кредитом.






Комментариев нет:

Отправить комментарий