пятница, 29 ноября 2013 г.

Инструкция по установке Twitter Storm

Storm:  distributed and fault-tolerant realtime computation
По горячим следам, пока это еще в голове. На первый взгляд всё просто, но поприсидать пришлось.

Последовательность такая:

1.  Ставим Zookeeper
2.  Ставим зависимости для Nimbus и рабочих машин
3.  Скачиваем и ставим Storm на Nimbus и рабочие машины
4.  Правим конфиг Шторма  storm.yaml
5.  Запускаем демонов

Настраиваем мастер (Nimbus):


  • Zookeeper
    Для тестового кластера хватит одной ноды. За одно поставим утилиты и библиотеки, необходимые для Nimbus

apt-get install openjdk-6-jdk zookeeper make build-essential
apt-get install uuid-dev unzip pkg-config libtool automake

  • apache
    Для UI нужен веб сервер
apt-get install apache2


Далее настройка Нимбуса и рабочих нод одинакова.
Ставим зависимости. Storm требует ZeroMQ 2.1.7 и JZMQ.
  •  ZerroMQ 2.1.7Нужна версия 2.1.7 ибо 2.1.10 глюкава

sudo aptitude install build-essential
cd ~
wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
tar zxvf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7
./configure
make
sudo make install
sudo ldconfig


  • JZMQ
    С ним пришлось повозиться. Ниже приведен самый рабочий способ:

    Будет нужен GIT 
sudo apt-get install git
git clone --depth 1 https://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh

export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk-amd64

./configure
touch src/classdist_noinst.stamp
cd src/

CLASSPATH=.:./.:$CLASSPATH javac -d . org/zeromq/ZMQ.java org/zeromq/App.java org/zeromq/ZMQForwarder.java org/zeromq/EmbeddedLibraryTools.java org/zeromq/ZMQQueue.java org/zeromq/ZMQStreamer.java org/zeromq/ZMQException.java

cd ..
make
sudo make install

  • Storm     
    Собственной персоны
wget https://github.com/downloads/nathanmarz/storm/storm-0.8.1.zip
unzip storm-0.8.1.zip
sudo mkdir /mnt/storm
sudo chmod 777 /mnt/storm

         правим conf/storm.yaml
storm.zookeeper.servers:
- "111.222.333.444"
- "555.666.777.888"

storm.local.dir: "/mnt/storm"

nimbus.host: "111.222.333.44"

supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703

Запускаем:
  • Nimbus на мастере "bin/storm nimbus"
  • Supervisor на каждой рабочей машине "bin/storm supervisor"
  • UI на мастере "bin/storm ui" http://{nimbus host}:8080

вторник, 24 сентября 2013 г.

HBase загрузка больших массивов данных через bulk-load

Привет коллеги.
Хочу поделиться своим опытом использования HBase, а именно рассказать про bulk loading. Это еще один метод загрузки данных. Он принципиально отличается от обычного подхода (записи в таблицу через клиента). Есть мнение, что с помощью bulk load можно очень быстро загружать огромные массивы данных. Именно в этом я решил разобраться.

Итак, обо всём по порядку. Загрузка через bulk load происходит в три этапа:
  1. Помещаем файлы с данными в HDFS
  2. Запускаем MapReduce задачу, которая преобразует исходные данные непосредственно в файлы формата  HFile, посути HBase хранит свои данные именно в таких файлах.
  3. Запускаем bulk load функцию, которая зальёт (привяжет) полученные файлы в таблицу HBase.




В данном случае мне было необходимо прочувствовать эту технологию и понять её в цифрах: чему равна скорость, как она зависит от количества и размера файлов. Эти числа слишком зависимы от внешних условий, но помогают понять порядки между обычной загрузкой и bulk load.

Исходные данные:

  • Кластер под управлением Cloudera CDH4, HBase 0.94.6-cdh4.3.0.
    Три виртуальных хоста (на гипервизоре), в конфигурации CentOS / 4CPU / RAM 8GB / HDD 50GB
  • Тестовые данные хранились в CSV файлах различных размеров, суммарным объёмом 2GB, 3.5GB, 7.1GB и 14.2GB

Сначала о результатах:


Bulk loading
суммарный размер данных
количество файлов
суммарное количество записей (rows)
количество map
количество reduce
время задачи (Job, сек)
время общее (сек)
2GB
16
4 000 000
16
10
88
160
2GB
100
4 000 000
100
10
137
207
3.5GB
28
7 000 000
28
10
120
191
3.5GB
100
7 000 000
100
10
183
253
3.5GB
1
7 000 000
28
10
123
192
7.1GB
100
14 000 000
100
10
314
380
7.1GB
1
14 000 000
55
10
258
330
14.2G
1
28 000 000
109
10
510
583

Cкорость:
  • Max 29.2 Mb/sec или 58K rec/sec (3.5GB в 28 файлах)
  • Average 27 Mb/sec или 54K rec/sec(рабочая скорость, более приближенная к реальности )
  • Min  14.5 Mb/sec или 29K rec/sec (2GB в 100 файлах)
  • 1 файл загружается на 20% быстрее чем 100

Размер одной записи (row): 0.5Кb
Время инициализации MapReduce Job: 70 sec
Время загрузки файлов в HDFS с локальной файловой системы:
  • 3.5GB / 1 файл - 65 sec 
  • 7.5GB / 100 - 150 sec 
  • 14.2G / 1 файл - 285 sec

Загрузка через клиенты:

Загрузка осуществлялась с 2-х хостов по 8 потоков на каждом.
Клиенты запускались по крону в одно и тоже время, загрузка CPU не превышала 40%
Размер одной записи (row), как и в предыдущем случае был равен 0.5Кb.

количество записей (rows)
среднее время (сек)
4 000 000
109
7 000 000
208
14 000 000
380
28 000 000
836

Что в итоге?


Реализовать этот тест, я решил на волне разговоров о bulk load, как о способе сверхбыстрой загрузки данных. Стоит сказать, что в официальной документации речь идет только о снижении нагрузки на сеть и CPU. Как бы там ни было,  я не вижу выигрыша в скорости. Тесты показывают что bulk load быстре всего лишь в полтора раза, но не будем забывать, что это без учета инициализации m/r джобы. Кроме того, данные надо доставить в HDFS, на это тоже потребуется какое то время.
Думаю, стоит относиться к bulk load просто, как к еще одному способу загрузки данных, архитектурно иному (в некоторых случаях очень даже удобному).

А теперь о реализации

Теоретически всё довольно просто, но на практике возникает несколько технических нюансов.

//Создаём джоб
Job job = new Job(configuration, JOB_NAME);
job.setJarByClass(BulkLoadJob.class);

job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);

job.setMapperClass(DataMapper.class);
job.setNumReduceTasks(0);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);

FileInputFormat.setInputPaths(job, inputPath);
HFileOutputFormat.setOutputPath(job, new Path(outputPath));

HTable dataTable = new HTable(jobConfiguration, TABLE_NAME);
HFileOutputFormat.configureIncrementalLoad(job, dataTable);

//Запускаем
ControlledJob controlledJob = new ControlledJob(
    job,
    null
);

JobControl jobController = new JobControl(JOB_NAME);
jobController.addJob(controlledJob);

Thread thread = new Thread(jobController);
thread.start();
.
.
.
//Даём права на output
setFullPermissions(JOB_OUTPUT_PATH);

//Запускаем функцию bulk-load
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(jobConfiguration);
loader.doBulkLoad(
        new Path(JOB_OUTPUT_PATH),
        dataTable
);

  • MapReduce Job создаёт выходные файлы c правами пользователя, от имени которого он был запущен.
  • bulk load всегда запускается от имени пользователя hbase, поэтому не может прочитать подготовленные для него файлы, и валится вот с таким исключением:
    org.apache.hadoop.security.AccessControlException: Permission denied: user=hbase
Поэтому надо запускать Job от имени пользователя hbase или раздать права на выходные файлы (именно так я сделал).
  • Необходимо правильно создать таблицу HBase. По умолчанию она создается с одним Region-ом. Это приводит к тому, что создается только один редьюсер и запись идет только на одну ноду, загружая её на 100%, остальные при этом курят.
    Поэтому при создании новой таблицы надо сделать pre-split. В моём случае таблица разбивалась на 10 Region-ов равномерно разбросанных по всему кластеру.

    //Создаём таблицу и делаем пре-сплит
    HTableDescriptor descriptor = new HTableDescriptor(
            Bytes.toBytes(tableName)
    );
    
    descriptor.addFamily(
            new HColumnDescriptor(Constants.COLUMN_FAMILY_NAME)
    );
    
    HBaseAdmin admin = new HBaseAdmin(config);
    
    byte[] startKey = new byte[16];
    Arrays.fill(startKey, (byte) 0);
    
    byte[] endKey = new byte[16];
    Arrays.fill(endKey, (byte)255);
    
    admin.createTable(descriptor, startKey, endKey, REGIONS_COUNT);
    admin.close();
    

    • MapReduce Job пишет в выходную директорию, которую мы ему указываем, но при этом создает  субдиректории, одноименные с column family. Файлы создаются именно там.
    В целом, это всё. Хочется сказать, что это довольно грубый тест, без хитрых оптимизаций, поэтому если у вас есть что добавить, буду рад услышать.

    Весь код проекта доступен на GitHub: https://github.com/2anikulin/hbase-bulk-load


    воскресенье, 25 августа 2013 г.

    Cloudera oozie WebUI error


    Не заботает web консоль oozie в Клаудере.
    Ругается:

    Oozie web console is disabled.
    To enable Oozie web console install the Ext JS library.

    ставим библиотеку:  

    wget 'http://extjs.com/deploy/ext-2.2.zip'
    unzip ext-2.2.zip
    sudo cp ext-2.2 /usr/lib/oozie/libext
    даём права:
    sudo chmod -R ugo+rwx /usr/lib/oozie/libext/ext-2.2

    прописываем симлинк:
    cd /var/lib/oozie/
    sudo ln -s /usr/lib/oozie/libext/ext-2.2 ext-2.2

    перезапускаем oozie

    понедельник, 29 июля 2013 г.

    вторник, 16 июля 2013 г.

    понедельник, 15 июля 2013 г.

    Hadoop MapReduce "Can't read partitions file"

    Во время выполнения джобы, падает вот такое исключение "Can't read partitions file"


    13/07/15 18:48:42 WARN mapred.LocalJobRunner: job_local38891965_0001
    java.lang.Exception: java.lang.IllegalArgumentException: Can't read partitions file
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
    Caused by: java.lang.IllegalArgumentException: Can't read partitions file
    at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:108)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:70)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:588)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:657)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:331)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
    Caused by: java.io.FileNotFoundException: File file:/home/anikulin/_partition.lst does not exist
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:468)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:373)
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1704)
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1728)
    at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.readPartitions(TotalOrderPartitioner.java:293)
    at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:80)
    ... 12 more


    Это значит что наш кластер функционирует в режиме "pseudo distributed mode"
    Хотя никто его об этом не просил!
    Чинится прописыванием адреса JobTracker в mapred-site.xml

    В Клоудеровском исполнении это делается из консоли, добавлением свойств в
    "MapReduce Service Configuration Safety Valve for mapred-site.xml"

    <property>
           <name>mapred.job.tracker</name>
           <value>categorizer-hadoop-1.mywork:8021</value>
    </property>


    http://archive.cloudera.com/cdh4/cdh/4/hbase/book.html

    Вот опять начались дни Хадупа в Макдональдсе

    sudo -u hdfs hadoop dfsadmin -safemode leave

    Переводит ноду из режима "Name node is in safe mode" 

    понедельник, 24 июня 2013 г.

    Переключаемся между разными JVM

    Иногда требуется переключиться с одной версии джавы на другую.
    Для этого есть парочка полезных команд. Запускаем и выбираем нужное...

    sudo update-alternatives --config java
    To update the Java compiler run:
    sudo update-alternatives --config javac

    среда, 1 мая 2013 г.

    Apache Thrift RPC Server. Дружим C++ и Java

    Thrift - технология для организации межпроцессного взаимодействия между компонентами системы. Была разработана где то в недрах Facebook. Посути это кросс-языковой фреймворк для создания RPC сервисов, на бинарном протоколе. С помощью этого решения можно "подружить" компоненты написанные на разных языках  C#, C++, Delphi, Erlang, Go, Java, PHP, Python, Ruby, итд. Описание сигнатур сервисов и данных осуществляется с помощью специального IDL - языка. Технология, по своей сути, похожа на COM, но без всей этой обвязки с регистрацией компонент. Так же не будем забывать, что COM это технология только для Windows, в то время как Thrift - кросплатформенна.

    Вобщем решил поэкспериментировать, попробовать вынести часть нагруженной-вычислительной логики из Java в С++, в надежде что нативный С++ код будет производительней, за одно опробовать Thrift RPC, в надежде что это быстрее чем REST.
    Как и положено, без бубнов и граблей не обошлось!

    И так, для начала надо всё это поставить:
    1. Ставим поддержку для Boost, ибо всё завязано на нём

    $ sudo apt-get install libboost-dev libboost-test-dev libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev

    2. качаем thrift tarball http://apache.softded.ru/thrift/0.9.0/thrift-0.9.0.tar.gz
    распаковываем, запускаем configure, затем собираем

    $ ./configure
    $ make
    $ sudo make install

    Вроде бы всё... Можно даже попробовать сгенерировать код из учебника, который идет вместе с thrift tarball

    $ thrift --gen cpp tutorial.thrift

    команда thrift сгенерирует С++ обвертки, и бережно положит их в директорию gen-cpp. Тоже самое можно сделать для Java, PHP итд...

    Пробуем скомпилировать и собрать нагенеренные исходники

    $ g++ -Wall -I/usr/local/include/thrift *.cpp -L/usr/local/lib -lthrift -o something

    Упс, получите:  error: ‘uint32_t’ does not name a type
    Оказывается есть небольшой таракан в библиотеках thrift связанный с uint32_t.  Лечится  добавлением "#include <stdint.h>" в "Thrift.h", или (что лучше всего) специальными опциями компилятора -DHAVE_NETINET_IN_H -DHAVE_INTTYPES_H

    Теперь это выглядит так:

    $ g++ -DHAVE_INTTYPES_H -DHAVE_NETINET_IN_H -Wall -I/usr/local/include/thrift *.cpp -L/usr/local/lib -lthrift -o something

    Вот и всё, появился исполнимый файл, под названием something.
    Запускаем, и получаем: error while loading shared libraries: libthrift.so.0: cannot open shared object file: No such file or directory
    Возможно есть какие-то элегантные методы решения этой проблемы, но я решил её в лоб, копированием thrift файлов из /usr/local/lib в /lib

    Всё, пример запустился. Значит, все на местах, и всё работает.

    Теперь можно писать свой RPC сервер.
    Его задача, быть key-value хранилищем. Хранить длинные (в несколько сот тысяч) битовые маски. Складывать их (AND), и отдавать клиенту массив индексов, в которых получились еденицы. Да, почти тоже самое умеет Redis, но он мне не подходит.

    Полный код лежит здесь: https://github.com/2anikulin/fast-hands.git

    Описываем сигнатуры данных и сервисов, в thrift definition file:

    namespace cpp fasthands
    namespace java fasthands
    namespace php fasthands
    namespace perl fasthands
    
    exception InvalidOperation {
      1: i32 what,
      2: string why
    }
    
    service FastHandsService {
    
     i32 put(1:i32 key, 2:binary value),
     
     binary get(1:i32 key),
     
     list <i32> bitAnd(1:list<i32> keys) throws (1:InvalidOperation ouch)
    }
    

    И генерируем обвертки.
    Реализация на C++
    Этот код, создает, и запускает RPC сервер

    #define PORT 9090
    #define THREAD_POOL_SIZE 15
    
    
    int main() {
    
      printf("FastHands Server started\n");
    
      shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
      shared_ptr<FastHandsHandler> handler(new FastHandsHandler());
      shared_ptr<TProcessor> processor(new FastHandsServiceProcessor(handler));
    
      shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(THREAD_POOL_SIZE);
      shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
      threadManager->threadFactory(threadFactory);
      threadManager->start();
    
      TNonblockingServer server(processor, protocolFactory, PORT, threadManager);
      server.serve();
      printf("done.\n");
    
      return 0;
    }
    

    В классе FastHandsHandler - имплементируется весь наш, прикладной функционал
    Это заголовочный файл

    class FastHandsHandler : virtual public FastHandsServiceIf {
    
     public:
      FastHandsHandler();
      int32_t put(const int32_t key, const std::string& value);
      void get(std::string& _return, const int32_t key);
      void bitAnd(std::vector<int32_t> & _return, const std::vector<int32_t> & keys);
    
     private:
      void appendBitPositions(std::vector<int32_t> & positions, unsigned char bits, int offset);
    
     private:
      std::map<int32_t, std::string> m_store;
    
    };
    


    Пробуем собрать, и получаем очередную ошибку: c++ undefined reference to apache::thrift::server::TNonblockingServer
    Дело в том, что, в отличии от учебника, мой сервер - ассинхронный, и использует класс TNonblockingServer. Чтобы код собирался, надо добавить дополнительные библиотеки -lthriftnb -levent

    т.е сборка сейчас будет выглядеть так:
    g++ -DHAVE_INTTYPES_H -DHAVE_NETINET_IN_H -Wall -I/usr/local/include/thrift *.cpp -L/usr/local/lib -lthrift -lthriftnb -levent -o something

    Теперь все хорошо. Код скомпилирован, слинкован, на выходе файл по имени something
    Генерируем обвертки для java, и пишем вот такого клиента


    import fasthands.FastHandsService;
    import fasthands.InvalidOperation;
    import org.apache.thrift.TException;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransportException;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocol;
    
    import java.nio.ByteBuffer;
    import java.util.ArrayList;
    import java.util.List;
    
    public class JavaClient {
    
        public static void main(String [] args) {
            TTransport transport = new TFramedTransport(new TSocket("localhost", 9090));
            TProtocol protocol = new TBinaryProtocol(transport);
            final FastHandsService.Client client = new FastHandsService.Client(protocol);
    
            final List<Integer> filters = new ArrayList<Integer>();
    
            try {
                transport.open();
    
                int count = 12500;
                byte bt[] = new byte[count];
                for (int i =0; i < count; i++) {
                    bt[i] = (byte)0xFF;
                }
    
                for (int i = 0; i < 50; i++) {
                    client.put(i, ByteBuffer.wrap(bt)) ;
                    filters.add(i);
                }
    
                List<Integer> list = client.bitAnd(filters);
                System.out.println(list.size());  
    
            } catch (TTransportException e) {
                e.printStackTrace();
            } catch (TException e) {
                e.printStackTrace();  
            }
    
            transport.close();
        }
    }
    

    Что в итоге.
    Интересная технология, и не плохой способ прикрутить транспортный функционал к голому коду на С++. Но не скажу, что это намного быстрее чем REST, бинарные данные прекрасно передаются и по http. Что касается производительности кода, вынесенного из Java в С++, то чуда не произошло, он быстрее в 1.2 - 1.4 раза, ибо сериализация + расходы на транспортный уровень.


    Полезные ссылки:
    http://thrift.apache.org/
    http://wiki.apache.org/thrift/ThriftUsageC%2B%2B
    http://fundoonick.blogspot.ru/2010/06/sample-thrift-program-for-server-in.html

    четверг, 18 апреля 2013 г.

    Unsupported major.minor version 51.0

    Часто, в последнее время сталкиваюсь с таким исключением. Дабы не лазить на StackOverflaw - напишу здесь Это значит, что подключаемые внешние jar файлы, собраны для более новой версии java. А мы пытаемся запуститься на старой. Обычно речь идет о JRE 6 -7

    Exception in thread "main" java.lang.UnsupportedClassVersionError: org/eclipse/jetty/server/Handler : Unsupported major.minor version 51.0
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
    at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247)

    суббота, 6 апреля 2013 г.

    VirtualBox проблемы после обновления linux kernel

    Почти после каждого обновления Ubuntu, отваливается VirtualBox.
    Выкидывает, такое сообщение:
    Kernel driver not installed (rc=-1908)
    The VirtualBox Linux kernel driver (vboxdrv) is either not loaded or there is a permission problem with /dev/vboxdrv. Re-setup the kernel module by executing
    '/etc/init.d/vboxdrv setup'
    as root. Users of Ubuntu, Fedora or Mandriva should install the DKMS package first. This package keeps track of Linux kernel changes and recompiles the vboxdrv kernel module if necessary.
    Напрягает!
    Чинится, вот так:
    sudo apt-get install linux-headers-`uname -r`
    sudo apt-get remove dkms  
    sudo apt-get install dkms virtualbox-dkms  
    sudo modprobe vboxdrv



    четверг, 28 февраля 2013 г.

    Экспорт Hadoop - MapReduce в MongoDB

    Понадобилось направить в MongoDB выход хадуповского M/R .
    Пришлось повозиться.

    Используем MongoDB+Hadoop Connector, не забываем про avro без него не будет работать
    <dependency>
       <groupId>org.mongodb</groupId>
       <artifactId>mongo-hadoop-core_cdh4b1</artifactId>
       <version>1.0.0</version>
    </dependency>
    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>avro</artifactId>
       <version>1.1.0</version>
    </dependency>
    1. MongoConfigUtil.setOutputURI - надо ставить до вызова   Job job = new Job(configuration, JOB_NAME); Иначе получаем ошибку связанную с невозможностью подключиться к Монге, дело в том что setOutputURI ставит параметры в configuration, которые Job копирует при создании.

    2. Надо обязательно ставить выходной тип ключа для редьюса MongoConfigUtil.setOutputKey(configuration, Text.class);
    иначе по умолчанию MongoConfigUtil ставит BSON, и становится тяжело понять, почему валится job

    3. Указываем Output Format  job.setOutputFormatClass(MongoOutputFormat.class);
    public static Job createJob(Configuration configuration,  String... paths)   throws IOException, JSONException {
            MongoConfigUtil.setOutputURI(
                configuration, "mongodb://mongo-stg-2:27102/buffer.jobout"
            );           
            MongoConfigUtil.setOutputKey(configuration, Text.class);
            Job job = new Job(configuration, JOB_NAME);
            job.setJarByClass(MongoWriterJob.class);
            job.setReducerClass(MongoWriterReducer.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(ResultJSONWritable.class);
            job.setOutputFormatClass(MongoOutputFormat.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DBObject.class);
            job.setInputFormatClass(SequenceFileInputFormat.class);
            for (String path : paths) {
                FileInputFormat.addInputPath(job, new Path(path));
            }
            return job;
      }

    Ну и наконец, reducer на выходе должен отдавать DBObject :
    public static class MongoWriterReducer extends Reducer<
                ImmutableBytesWritable,
                ResultJSONWritable,
                Text,
                DBObject> {...


    пятница, 22 февраля 2013 г.

    MongoDB map-reduce, интересные нюансы

    Задача:
    Необходимо сгруппировать объекты по атрибуту source_name, и посчитать количество таких объектов для каждой группы.

    Решение:
    В MongoDB есть Aggregation framework, он позволяет в том числе запускать m/r задачи.
    Надо сказать что в Монге какой то странный мэп-редьюс. Во первых он однопоточный, что противоречит самой парадигме m/r, а во вторых с редьюсами происходит что то не понятное, что запутывает окончательно.

    Первое решение (логичное, но не правильное) было таким:
    Всё как обычно, каждому ключу ставим значение 1, в редьюсе подсчитываем их количество
    db.buffer.mapReduce(
        "function() {
             emit(this.source_name, 1);
        }",
        "function(k, vals) {
             var count = vals.length;
             return {count: count};
        }",
        {out: "buffer_results"}
    );
    out:
    {
        "result" : "buffer_results",
        "timeMillis" : 70258,
        "counts" : {
            "input" : 2803200,
            "emit" : 2803200,
            "reduce" : 166542,
            "output" : 13
         },
       "ok" : 1,
    }

    { "_id" : "source_A", "value" : { "count" : 2601 } }
    { "_id" : "source_B", "value" : { "count" : 50456 } }
    { "_id" : "source_C", "value" : { "count" : 23422 } }
    { "_id" : "source_D", "value" : { "count" : 725523 } }
    { "_id" : "source_E", "value" : { "count" : 234 } }
    { "_id" : "source_F", "value" : { "count" : 123111 } }
    { "_id" : "source_G", "value" : { "count" : 4 } }
    { "_id" : "source_H", "value" : { "count" : 1 } }
    { "_id" : "source_I", "value" : { "count" : 1 } }
    { "_id" : "source_J", "value" : { "count" : 7363 } }
    { "_id" : "source_K", "value" : { "count" : 560 } }
    { "_id" : "source_L", "value" : { "count" : 10 } }
    Если суммировать все count, должно получиться  2803200, но не получается. Такое впечатление, что это результат какого то не полного редьюса.

    И действительно,  выясняется интересный нюанс. Редьюс может быть несколько раз вызван для одного и того же ключа. Невероятно, но факт! Это связанно с архитектурой Монги, m/r может выполняться на кластере,  документы с одним и тем же ключом могут находится на разных серверах. Сначала редьюс по ключу выполняется на одном сервере, а затем на другом.
    После чего происходит финальный редьюс, в который попадают результаты выполнения первых двух. Т.е входом в редьюс могут быть как значения выхода мэпа, так и редьюса.



    Теперь всё встало на свои места. В сложившейся ситуации, код m/r должен быть таким:
    db.buffer.mapReduce(
       "function() {
           emit(this.source_name, {count: 1});
       }",
       "function(key, values) {
          var count = 0; 
          values.forEach(function(v) {count += v['count'];}); 
          return {count: count};
       }", 
       {out: "buffer_results"}
    );
    Мэп и редьюс генерируют одинаковые объекты на выходе. Поэтому редьюсу без разницы, от кого пришли данные.
    out:
    {
        "result" : "buffer_results",
        "timeMillis" : 70258,
        "counts" : {
                "input" : 2803200,
                    "emit" : 2803200,
                "reduce" : 166542,
                "output" : 13
        },
        "ok" : 1,
    }

    { "_id" : "source_A", "value" : { "count" : 2601 } }

    { "_id" : "source_B", "value" : { "count" : 1380432 } }
    { "_id" : "source_C", "value" : { "count" : 106087 } }
    { "_id" : "source_D", "value" : { "count" : 854294 } }
    { "_id" : "source_E", "value" : { "count" : 100509 } }
    { "_id" : "source_F", "value" : { "count" : 151713 } }
    { "_id" : "source_G", "value" : { "count" : 4 } }
    { "_id" : "source_H", "value" : { "count" : 1 } }
    { "_id" : "source_I", "value" : { "count" : 1 } }
    { "_id" : "source_J", "value" : { "count" : 72363 } }
    { "_id" : "source_K", "value" : { "count" : 2560 } }
    { "_id" : "source_L", "value" : { "count" : 10 } }

    В итоге получаем правильные результаты. Дебит сходится с кредитом.






    Фтопку !

    Здесь буду перечислять всякие неприятности, о которых стоит помнить в следующий раз...

    -------------------------------------------------------------------------------------------
    au.com.bytecode.opencsv.CSVReader в топку !

    Жутко заглючил у заказчика.
    Если от парсера не требуется мэппинг в POJO, то лучше свой написать.
    Всего то 10 строк кода.
    -------------------------------------------------------------------------------------------

    среда, 20 февраля 2013 г.

    Интеграция Hive c HBase (грабли)

    Hive - замечательная вещь. Про то, что это и зачем это, написано уже не мало.
    В основном все примеры описывают, как делать запросы по хадуповским файлам.
    А вот как делать HQL запросы в HBase - информации мало.

    Т.к без слёз делать выборки в HBase не получается, пришлось прикручивать Hive

    Порядок действий прост.
    Создаём в консоли Hive, таблицу связанную с таблицей в HBase, настраиваем мэппинг полей
    hive> CREATE EXTERNAL TABLE hive_raw_data(key string,  dmp_id string,  referrer string,  source_id string,  source_name string)
        > STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
        > WITH SERDEPROPERTIES ("hbase.columns.mapping" = "data:dmp_id,  data:referrer,  data:source_id,  data:source_name")      
        > TBLPROPERTIES("hbase.table.name" = "raw_data");

    Ууупс, первые грабли. Система не внятно ругается, и никак не создает таблицу. Понять в чем дело - трудно. Оказывается при описании мэппинга ("hbase.columns.mapping" = "data:dmp_id,  data:referrer,  data:source_id,  data:source_name"), не должно быть пробелов между полями! Должно быть вот так: "hbase.columns.mapping" = "data:dmp_id,data:referrer,data:source_id,data:source_name"

    Исправляем, и... таблица создана.
    запускаем тест select * from hive_row_data и видим что всё работает.
    Но радоваться рано, простой select запускает обычный перебор, а как только запрос становится чуть сложнее например select * from hive_row_data where key = 'xxx' вот и начинает всё валится (ибо запускается механизм Map - reduce).
    В итоге получаем исключение, вот такого плана:
    Exception in thread "Thread-48" java.lang.NullPointerException
        at org.apache.hadoop.hive.shims.Hadoop23Shims.getTaskAttemptLogUrl(Hadoop23Shims.java:44)
        at  org.apache.hadoop.hive.ql.exec.JobDebugger$TaskInfoGrabber.getTaskInfos
        (JobDebugger.java:186)
        at org.apache.hadoop.hive.ql.exec.JobDebugger$TaskInfoGrabber.run(JobDebugger.java:142)
        at java.lang.Thread.run(Thread.java:662) 
    На самом деле, те исключения которые валятся в консоль, можно игнорировать. Смотреть надо лог джоба. Залазим - смотрим:
    INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201302201311_0003_m_000000_3: java.io.IOException: Cannot create an instance of InputSplit class = org.apache.hadoop.hive.hbase.HBaseSplit:Class     org.apache.hadoop.hive.hbase.HBaseSplit not found at       org.apache.hadoop.hive.ql.io.HiveInputFormat$HiveInputSplit.readFields(HiveInputFormat.java:146)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:73)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:44)
    at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:351)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:367)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:327)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1332)
    at org.apache.hadoop.mapred.Child.main(Child.java:2
    Это значит что map-reduce задачи, запускаемые Hive-ом ничего не знают про HBase, и просто не могут найти нужные jar. Беда в том, что эти библиотеки нужны на всех нодах.

    Решение здесь такое:
    1. Сохраняем нужные Jar в HDFS (/user/hive/)
    hadoop dfs -put /usr/lib/hive/lib/hbase-VERSION.jar /user/hive/hbase-VERSION.jar
    hadoop dfs -put /usr/lib/hive/lib/hive-hbase-handler-VERSION.jar /user/hive/hive-hbase-handler-VERSION.jar

    в моем случае это выглядит так:

    hadoop dfs -put /usr/lib/hive/lib/hive-hbase-handler-0.9.0-cdh4.1.3.jar /user/hive/hive-hbase-handler-0.9.0-cdh4.1.3.jar
    hadoop dfs -put /usr/lib/hive/lib/hbase-0.92.1-cdh4.1.3-security.jar /user/hive/hbase-0.92.1-cdh4.1.3-security.jar

    Затем правим hive-site.xml:
    <property>
        <name>hive.aux.jars.path</name>
        <value>/user/hive/hive-hbase-handler-0.9.0-cdh4.1.3.jar,/user/hive/hbase-0.92.1-cdh4.1.3-security.jarN.jar</value>
    </property>
    Вот и всё. После этого Hive начинает исправно выполнять сложные выборки.
    Работаем и наслаждаемся.

    https://cwiki.apache.org/Hive/hbasebulkload.html





    суббота, 16 февраля 2013 г.

    Hadoop - Слоники большие и маленькие

    Hadoop в последнее время шагает в массы. Вот и мне довелось с ним работать.
    Отличная система, но грабельки большие и маленькие всё таки там раскиданы.
    Вроде бы мелочи, но на решение каждой уходит по пол дня.

    И так, во первых Hadoop бывает в нескольких дистрибутивных исполнениях:

    1. Apache, как есть http://hadoop.apache.org/releases.html
    Hadoop в чистом виде. Ставить на всех нодах надо руками, править конфиги. Учить его видеть HBase. В общем надо с головой уходить в администрирование. Свои плюсы в этом конечно есть, но где взять столько времени ?

    2. Cloudera http://www.cloudera.com
    Половина, из создателей Hadoop организовала свою кантору и свой дистрибутив.
    Главная его изюминка - это ClouderaManager, с помощью которого можно очень легко управлять кластером, мониторить, добавлять ноды. Ставиться всё в два клика, система разворачивается на кластере, всё нужное в конфигах прописывается автоматически. В общем прелесть.
    Это коммерческая система, но есть бесплатная версия, с ограничением в 50 нод на кластер.

    Так выглядит Web консоль ClouderaManager


    3. MapR http://www.mapr.com/
    Другая половина создателей Hadoop решала сделать свой лунопарк, и выпустила свой дистрибутив. Всё как в Cloudera или в Cloudera всё как в MapR. Плюс комунити, форум, и отсутствие ограничений на размер кластера. Вобщем тоже красота.

    MapR я не пробовал, а начал работать с Cloudera, по этому о всех изысках по порядку:

    1. У Cloudera свой мавновский репозиторий. 
    И необходимо использовать именно их зависимости в купе с версиями системы на вашем кластере
    ---------------------------------------------------------------------------
     <repositories>
         <repository>
             <id>apache</id>
             <url>https://repository.apache.org/content/repositories/releases/
             </url>
         </repository>
         <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/
            </url>
        </repository>
    </repositories>
    ---------------------------------------------------------------------------

    <dependency>
         <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>2.0.0-mr1-cdh4.1.2</version>
          <scope>provided</scope>
    </dependency>
    ---------------------------------------------------------------------------

    2. Минимизация Super Jar
    Я привык собирать super jar. Это удобно, все нужные зависимости находятся уже внутри.
    Таким способом можно избежать возможных проблем в продакшене.
    Но как оказалось, для map - reduce такой подход опасен. Когда в jar включаются клоудеровские зависимости, в run time начинается какой то бардак. Такое впечатление что вместо одних методов начинают вызываются совсем другие. Поэтому из конечного архива пришлось исключить все клоудеровские зависимости. Сделать это можно просто, добавив ключ  <scope>provided</scope> к мавновской зависимости. После этого m/r работает как надо

    Но если надо работать с HBase, то следует полдностью включить org.apache.hbase
    Иначе хадуп просто не видит этих библиотек

    3. Map-reduce значения по умолчанию
    Очень важно непосредственно определять входные и выходные форматы Job-а, т.к по умолчанию они всегда текстовые.

    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    Тоже самое касается форматов входа и выхода Mapper-ов и Reducer-ов. Особенно болезненно это проявляется в задачах со сцепленными Job-ми

    job.setOutputKeyClass(ReportReduceWritable.class);   //Отдельно для Reduce
    job.setOutputValueClass(MyKeyWritable.class);

    job.setMapOutputKeyClass(Text.class);                           //Отдельно для Map
    job.setMapOutputValueClass(MyReduceWritable.class);

    4. Удаление директорий
    Перед запуском m/r задачи надо удалять все выходные директории, которые были созданы после предыдущего запуска. Нначе Job будет ругаться и вываливаться.

    5. Итератор Reducer-a нельзя использовать повторно
    Интересная грабелька. Когда мы имплементируем org.apache.hadoop.mapreduce.Reducer
    на вход приходит некий итератор Iterable<MyValue> values. Он позволяет считать данные только один раз. После этого перевести курсор в начало не возможно. Приходится выкручиваться сохранением элементов во временную коллекцию.


    6. Запуск Job от имени другого пользователя и TableMapReduceUtil.initTableMapperJob
    Убили на эти грабли день, прежде чем понять в чем дело.
    Запускаем Job по крону от имени специального - системного пользователя
    dmpservice. Валится !

    java.io.IOException: java.lang.RuntimeException: java.io.IOException: Permission denied
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.findOrCreateJar(TableMapReduceUtil.java:521)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(TableMapReduceUtil.java:472)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(TableMapReduceUtil.java:438)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:138)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:215)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:81)
    at ru.crystaldata.analyticengine.mr.HBaseReaderJob.createJob(HBaseReaderJob.java:206)
    at ru.crystaldata.analyticengine.JobsRunner.main(JobsRunner.java:52)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:208)
    Caused by: java.lang.RuntimeException: java.io.IOException: Permission denied
    at org.apache.hadoop.util.JarFinder.getJar(JarFinder.java:164)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.findOrCreateJar(TableMapReduceUtil.java:518)
    ... 12 more
    Caused by: java.io.IOException: Permission denied
    at java.io.UnixFileSystem.createFileExclusively(Native Method)
    at java.io.File.checkAndCreate(File.java:1704)
    at java.io.File.createTempFile(File.java:1792)
    at org.apache.hadoop.util.JarFinder.getJar(JarFinder.java:156)
    ... 17 more

    Делаем всё тоже самое но от обычного пользователя - работает.
    Убираем из джоба HBase - мепперы, запускаем от dmpservice - работает.
    Вывод: что то темное творится в недрах TableMapReduceUtil.initTableMapperJob

    Как оказалось TableMapReduceUtil.initTableMapperJob создает темповый файл для своих внутренних нужд, и пытается разместить его в HOME, а так как у пользователя dmpservice,
    не заданна данная переменная, то запись производится в root /, на который естественно нет прав.

    Лечится - при создании задачи крону, прописываем HOME=var/tmp


    Пока это всё. Удачи в начинаниях :)