Обробка невеликих стиснутих файлів в hadoop з використанням класу combinefileinputformat

У статті докладно розглядаються приклади, які демонструють розширення і реалізацію класу CombineFileInputFormat для читання вмісту файлів gzip (кодек за замовчуванням) в середовищі виконання. Ви дізнаєтеся, як застосовувати CombineFileInputFormat з технологією MapReduce для відділення розміру порції даних, споживаної інтерфейсом Mapper, від розміру блоку файлів в HDFS.

Суджай Сом. консультант по великим даними і розробник додатків, IBM

Бібліотека програм Apache Hadoop дозволяє обробляти дані в різних форматах, від простих текстових файлів до баз даних. У середовищі MapReduce використовується клас InputFormat для оцінки вхідних специфікації і розбиття вхідних файлів на логічні об'єкти InputSplit, кожен з яких потім призначається окремому екземпляру Mapper.

FileInputFormat. який є базовим класом для всіх файлових форматів InputFormat. має наступні прямі підкласи:

  • TextInputFormat
  • SequenceFileInputFormat
  • NLineInputFormat
  • KeyValueTextInputFormat
  • CombineFileInputFormat

Кожен з цих класів InputFormat пропонує родову реалізацію методу getSplits (JobContext). яка також може заміщати метод isSplitable (JobContext. Path), щоб вхідні файли не розділялися і цілком оброблялися інтерфейсом Mapper. Він реалізує клас createRecordReader. успадковані від класу org.apache.hadoop.mapreduce.InputFormat. який використовується для збору вхідних записів з логічного об'єкта InputSplit для обробки інтерфейсом Mapper.

InfoSphere BigInsights Quick Start Edition

Однак Hadoop більш ефективно працює з невеликою кількістю великих файлів, ніж з великою кількістю невеликих файлів. (В даному випадку «невеликий файл» означає, що його розмір значно менше розміру блоку Hadoop Distributed File System (HDFS)). Проблема вирішується шляхом створення класу CombineFileInputFormat. який ефективно працює з невеликими файлами, щоб клас FileInputFormat створював розбиття по файлах. CombineFileInputFormat поміщає безліч файлів в кожен елемент розбиття, щоб кожен екземпляр Mapper отримував більше даних для обробки. Клас CombineFileInputFormat може також надати переваги при обробці великих файлів. По суті, він відокремлює розмір порції даних, споживаної інтерфейсом Mapper, від розміру блоку файлів в HDFS.

В даний час CombineFileInputFormat є абстрактним класом в бібліотеці класів Hadoop (org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat) Без будь-якої конкретної реалізації. Для використання CombineFileInputFormat потрібно створити конкретний підклас CombineFileInputFormat і реалізувати метод CombineFileInputFormat and implement the createRecordReader (). який інстанцірует делегує клас CustomFileRecordReader. розширює RecordReader. використовуючи свій спеціальний конструктор. Цей підхід також вимагає, щоб CustomWritable був створений як ключ для класу Hadoop Mapper. Для кожного рядка ключ CustomWritable буде містити ім'я файлу і довжину зміщення цього рядка.

Ознайомтеся з прикладом Hadoop. який демонструє, як використовувати CombineFileInputForma для підрахунку появ слів в текстових файлах в заданому вхідному каталозі.

Тут ви дізнаєтеся, як розширити і реалізувати CombineFileInputFormat. додаючи можливість читання вмісту файлів gzip (кодек за замовчуванням) в середовищі виконання в розпакованому вигляді. Ми зробимо результат ключ-значення, де ключ - це комбінація імені файлу і зміщення рядки, а значення - це текстове представлення цього рядка. Приклад в цій статті використовує CustomInputFormat в середовищі MapReduce. Ми розробимо наступні ключові класи:

CompressedCombineFileInputFormat Розширює CombineFileInputFormat і реалізує createRecordReader для передачі в зчитувач записів, який виконує логіку для об'єднаних файлів CompressedCombineFileRecordReader Розширює RecordReader і є делегує класом CombineFileRecordReader CompressedCombineFileWritable Реалізує інтерфейс WritableComparable. зберігає ім'я файлу і зсув, а також заміщає метод compareTo. щоб спочатку порівнювати ім'я файлу, потім зсув.

Приклад CompressedCombineFileInputFormat використовує програму MapReduce, яка використовує загальнодоступні історичні дані про погоду Національного управління океанічних і атмосферних досліджень (National Oceanic and Atmospheric Administration, NOAA). NOAA збирає статистику з екстремальних показниками для станцій США, використовуючи щомісячні зведення для трьох метеорологічних показників - температура, випадання опадів і кількість снігу. Використовуваний тут приклад підраховує максимальну температуру з метеорологічних даних (National Climatic Data Center), які доступні в стислому (gzip) файлі. У розділі Ресурси наводиться додаткова інформація про отримання метеорологічних даних.

Вхідний формат для безлічі стислих файлів

Наше рішення використовує стиснене CombineFileInputFormat. який використовує три конкретних класу:

  • Підклас абстрактної реалізації CombineFileInputFormat (org.apache.hadoop.mapreduce.lib.input. CombineFileInputFormat)
  • Конкретний підклас RecordReader (org.apache.hadoop.mapreduce.RecordReader)
  • Спеціальний клас Writable. який реалізує WritableComparable і генерує ключ рядків файлу, який містить ім'я файлу і зсув рядків.

CompressedCombineFileInputFormat

CompressedCombineFileInputFormat.java - це підклас CombineFileInputFormat. Він реалізує InputFormat.createRecordReader (InputSplit, TaskAttemptContext) для побудови об'єктів RecordReader для підколекції вхідних файлів CombineFileSplit. На відміну від FileSplit. клас CombineFileSplit представляє не поділ файлу, а розділення вхідних файлів на менші набори. Поділ може містити блоки з різних файлів, однак все блоки в одному і тому ж наборі повинні бути локальними для конкретної стійки. CombineFileSplit використовується для реалізації RecordReader шляхом зчитування запису на кожен файл. Ділити файли навпіл не потрібно, тому метод isSplitable () заміщається для повернення значення false (в іншому випадку за замовчуванням було б значення true). Приклад показаний в лістингу 1.

Лістинг 1. CompressedCombineFileInputFormat.java

CompressedCombineFileRecordReader

CompressedCombineFileRecordReader.java - це делегує клас CombineFileRecordReader. є родовим класом RecordReader, який може видавати різні екземпляри RecordReader кожному блоку в CombineFileSplit. CombineFileSplit може комбінувати фрагменти даних з безлічі файлів. Він дозволяє використовувати різні екземпляри RecordReader для обробки фрагментів даних з різних файлів.

Коли викликається завдання Hadoop, CombineFileRecordReader зчитує розміри всіх файлів у вхідному шляху HDFS, які необхідно обробити і вирішити, скільки потрібно розділень, виходячи із значення MaxSplitSize. Для кожного відокремленого набору (який повинен бути файлом, оскільки метод isSplitabe () заміщується і повертає значення false) CombineFileRecordReader створює екземпляр CompressedCombineFileRecordReader і передає CombineFileSplit. context і index для CompressedCombineFileRecordReader з метою виявлення файлів для обробки.

Після інстанціацій CompressedCombineFileRecordReader визначає, чи містить вхідний файл кодек стиснення (org.apache.hadoop.io.compress.GzipCodec). Якщо містить, то файл розпаковується в середовищі виконання і використовується для обробки. В іншому випадку він вважається текстовим файлом. При обробці файлу CompressedCombineFileRecordReader створює CompressedCombineFileWritable як ключ для викликаного класу Mapper. Для кожного зчитування рядка CompressedCombineFileWritable містить ім'я файлу і довжину зміщення цього рядка, як показано в прикладі MapReduce.

У лістингу 2 показаний приклад CompressedCombineFileRecordReader.java.

Лістинг 2. CompressedCombineFileRecordReader.java

CompressedCombineFileWritable

Клас CompressedCombineFileWritable.java реалізує інтерфейс WritableComparable і розширює org.apache.hadoop.io.Writable, java.lang.Comparable. як показано в лістингу 3.

Writable - це Серіалізуемое об'єкт, який реалізує простий і ефективний протокол сериализации на базі DataInput і DataOutput. Будь-який тип ключа або значення в середовищі MapReduce реалізує цей інтерфейс. Реалізації зазвичай використовують статичний метод read (DataInput). який створює новий екземпляр, викликає метод readFields (DataInput). і повертає екземпляр.

Comparable - це інтерфейс, який є елементом Java ™ Collections Framework (JCF). Він застосовує повне впорядкування до об'єктів кожного класу, який його реалізує. Таке впорядкування називається природним упорядкуванням класу. Метод класу compareTo називається методом природного порівняння. Списки (або масиви) об'єктів, які реалізують цей інтерфейс, можуть сортуватися автоматично з використанням Collections.sort (і Arrays.sort). Об'єкти, які реалізують цей інтерфейс, можуть використовуватися як ключі в відсортованої колекції Map, або як елементи в відсортованому наборі, без необхідності вказувати компаратор.

У зв'язку з такими властивостями ви можете порівнювати CompressedCombineFileWritable з використанням компараторов. Команда hashCode () часто використовується в Hadoop для поділу ключів. Важливо, що реалізація hashCode () повертає один і той же результат для різних екземплярів JVM. Реалізація за замовчуванням hashCode () в Object не задовольняє цій властивості. Тому методи hashCode (). equals () і toString () замінені для забезпечення узгодженості та ефективності.

Лістинг 3. CompressedCombineFileWritable.java

приклад MapReduce

Приклад в цьому розділі показує, як використовувати CompressedCombineFileInputFormat з програмою MapReduce. Програма MapReduce використовує історичні дані NOAA про погоду з готовою статистикою екстремальних показників для станцій США у вигляді щомісячних зведень про температуру, випаданні опадів і кількості снігу. Приклад підраховує максимальні температури з метеорологічних даних, які знаходяться в стислому (gzip) файлі.

У прикладі розглядаються різні аспекти використання CompressedCombineFileInputFormat. Для виконання прикладу використовувалася платформа InfoSphere BigInsights Quick Start Edition.

  • Розгорнуто і працюватимуть у середовищі Hadoop.
  • Необхідні для прикладу дані завантажені з сайту Національного центру кліматичних даних (National Climatic Data Center. NCDC) управління NOAA.
  • Завантажені дані розміщені в HDFS.
  • Отримані статистичні дані обробляються на платформі InfoSphere BigInsights Quick Start Edition (в невиробничій середовищі).

виконання прикладу

Класи CompressedCombineFileInputFormat упаковані в файл JAR (CompressedCombine-FileInput.jar), на який можна посилатися в інших проектах. Ми будемо використовувати окремі програми MapReduce, щоб продемонструвати різницю в продуктивності при використанні вхідного формату за замовчуванням (org.apache.hadoop.io.Text) і спеціального вхідного формату (com.ssom.combinefile.lib.input. CompressedCombineFileInputFormat).

Вхідні дані

Дані NCDC загальнодоступні. У таблиці 1 показаний формат даних NCDC в наших прикладах.

Таблиця 1. Дані NCDC

Файли даних організовані по датах і метеорологічним станціям. Є каталог для кожного року починаючи з 1901-ого. Кожен каталог містить стислий файл для кожної метеорологічної станції з даними по кожному року. На малюнку 1 показані перші записи для 1947 року.

Малюнок 1. Приклад списку файлів

Обробка невеликих стиснутих файлів в hadoop з використанням класу combinefileinputformat

Я вибрав дані для 1901, 1902 і 1947 років, які містяться в більш ніж 1000 стислих файлів. Загальний обсяг файлів становить приблизно 56 МБ.

Використання вхідного формату за замовчуванням

Клас Mapper - це родовий тип з чотирма формальними параметрами типу, які задають типи вхідного ключа, вхідного значення, вихідного ключа і вихідного значення функції map, як показано нижче.

Для вхідного формату за замовчуванням:

  • Вхідний ключ - це довге ціле значення зміщення
  • Вхідний значення - це рядок тексту
  • Вихідний ключ - це рік
  • Вихідна значення - це температура повітря (ціле значення)

Метод map () отримує ключ і значення, перетворюючи текстове значення, що містить рядок введення, в рядок Java. Потім він використовує метод substring () для отримання значень. В цьому випадку Mapper записує рік як об'єкт Text (оскільки ми просто використовуємо його як ключ), а температура згортається в об'єкт типу IntWritable. Вихідна запис фіксується тільки в тому випадку, якщо температура присутній і код якості вказує на коректність показників температури.

Клас Reducer - це теж родової тип. Він має чотири формальних параметра типу, використовуваних для вказівки вхідних і вихідних типів. Вихідними типами функції reduce є Text і IntWritable. рік і його максимальна температура, яка виходить шляхом ітерації температур і порівняння кожної з них із записом найбільшого зі знайдених значень. Визначення класу Reducer наступне:

Результат буде виглядати так, як показано нижче, якщо є записи для декількох років.

На малюнках 2 і 3 показані результати виконання програми з використанням вхідного формату за замовчуванням (Text) в адміністративній консолі InfoSphere BigInsights.

Малюнок 2. Вкладка Application Status - виконання за замовчуванням

Обробка невеликих стиснутих файлів в hadoop з використанням класу combinefileinputformat

На малюнку 4 показаний результат виконання програми MapReduce.

Малюнок 4. Вміст файлу відповіді - виконання за замовчуванням

Обробка невеликих стиснутих файлів в hadoop з використанням класу combinefileinputformat

Використання спеціального вхідного формату

Тепер ми знову будемо використовувати клас Mapper, який є родовим типом, з CompressedCombineFileWritable в якості вхідного ключа, як показано нижче.

Клас Reducer залишається тим же, без будь-яких змін, і має наступне визначення:

Для визначення вхідного формату завдання ми використовуємо:

Приклади ключа і значення:

На малюнках 5 і 6 показаний результат використання спеціального вхідного формату (CompressedCombineFileInputFormat) в адміністративній консолі InfoSphere BigInsights.

Малюнок 5. Панель Application Status - виконання з використанням спеціального формату

Обробка невеликих стиснутих файлів в hadoop з використанням класу combinefileinputformat

На малюнку 7 показаний результат виконання програми MapReduce.

Малюнок 7. Вміст файлу відповіді - виконання з використанням спеціального формату

Обробка невеликих стиснутих файлів в hadoop з використанням класу combinefileinputformat

висновок

Якщо є можливість, рекомендується уникати обробки безлічі невеликих файлів, оскільки MapReduce найкращим чином виконується в тому випадку, якщо може працювати зі швидкістю передачі даних дисків в кластері. Обробка безлічі невеликих файлів збільшує кількість операцій пошуку, які необхідні для виконання завдання. Якщо з якихось причин, ділового або стратегічного характеру, у вас є велика кількість невеликих файлів і доступна HDFS, то можна використовувати клас CombineFileInputFormat. CombineFileInputFormat не тільки корисний для роботи з невеликими файлами, але і може забезпечити підвищення продуктивності при обробці великих файлів.

Отримати продукти і технології