Оптимизация запросов к Hive TEZ

Рассмотрим несколько вариантов улучшения производительности и оптимизация запросов к Hive. Если не заниматься этим вопросом, то даже простые запросы select к относительно небольшим таблицам будут рассчитываться несколько минут. Один раз я ожидала 7 часов расчет запроса с одной-единственной оконной функцией. Однако, следуя некоторым советам, можно ускорить расчеты на 50% и более. Все советы взяты из статьи и сама эта статья является вольным переводом указанной статьи.

Не устанавливайте количество reducer-ов вручную

Когда Tez исполняет запрос, он сперва определяет необходимое количество редьюсеров и автоматически корректирует в зависимости от количества обработанных байтов. Мы можем использовать параметр mapred.reduce.tasks. По умолчанию он равен -1, что означает автоматический выбор количества редьюсеров. Рекомендуется не изменять его и не указывать определенное количество вручную. Это позволит Tez выставлять нужное значение в зависимости от ситуации.

set mapred.reduce.tasks = 38; (Not Recommended)
set mapred.reduce.tasks = -1; (Recommended)

Мы можем использовать команду «ANALYZE TABLE .. COMPUTE STATISTICS», чтобы посмотреть, сколько мапперов и редьюсеров использует Tez для расчета. Также эта команда позволяет понять, где застревает запрос.

Еще мы можем включить CBO и векторизацию и с помощью запроса EXPLAIN оценить объем передачи данных через редукторы.

Если мы не используем mapred.reduce.tasks = -1, то нам нужно вычислить количество редьюсеров самостоятельно. Оно вычисляется по такой формуле (понятия не имею, что это значит, но именно ее использует Tez при автоматическом расчете):

Max(1, Min(hive.exec.reducers.max [1099], ReducerStage estimate/hive.exec.reducers.byte.per.reducer)) x hive.tez.max.partition.factor [2]

Настройки Tez для оптимизации запросов SELECT

set hive.optimize.index.filter=true;

Эта штука оптимизирует запросы вида «SELECT … FROM … WHERE». Космически полезная штука.

set hive.fetch.task.conversion=more;

Это оптимизирует выполнение запросов с оператором LIMIT. Тоже космически полезная штука, когда нужно просто проверить данные в таблице. Говорят, ускоряет вычисление до <1 секунды.

set hive.compute.query.using.stats=true;

Эта команда помогает ускорить вычисление запроса вида «select count (1) from table;»

Использование векторизации

Векторизация запросов помогает ускорить выполнение таких операторов как scans, aggregations, filters и joins, выполняя их батчами по 1024 строки за раз вместо одного ряда каждый раз. Векторизация появилась в версии Hive 0.13 и позволила существенно ускорить выполнение запросов. Чтобы подключить ее, нужно выполнить две команды:

set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;

Использование CBO — Cost Based Optimization

Построение DAG-а выполнения запроса «по умолчанию» не основана на оптимизации по стоимости запроса (которая включает как время, таки и использованную память). CBO позволяет выполнять дальнейшую оптимизацию на основе стоимости запроса, что приводит к другим решениям, таким как изменение порядка джойнов, какой тип джойна выполнять сначала, степень параллелизма и другие. Чтобы подключить эту функцию, нужно выполнить следующие команды:

set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;

Теперь мы можем подготовить данные для CBO, выполнив команду Hive ANALIZE для сбора различной статистики по таблицам, для которых мы хотим использовать CBO.

analyze table table_name compute statistics;
-- or as example:
analyze table tweets compute statistics for columns sender, topic;
-- or after Hive 0.14:
analyze table tweets compute statistics for columns;

Теперь выполнение запроса с использованием этой таблицы должно привести к другому плану выполнения, который быстрее из-за расчета стоимости.

Конфигурирование размера Tez Container

Джойны обрабатываются достаточно эффективно, поскольку одну таблицу (поменьше) Tez помещает в память в качестве хэш-карты в каждой ноде, а вторую таблицу (побольше) разносит по нодам для распределенных вычислений. Благодаря этому минимизируется пересылка данных, и запросы отрабатывают быстро. Однако, чтобы фокус удался, в контейнере должно быть достаточно памяти, чтобы загрузить в нее первую таблицу (которая «поменьше»).

Установить размер контейнера Tez, чтобы он был больше по размеру контейнера YARN (4 ГБ):

SET hive.tez.container.size=4096MB

Установить объем памяти под таблицы, сохраняемые в виде хэш-карты (рекомендуется треть от размера контейнера):

SET hive.auto.convert.join.noconditionaltask.size=1370MB

Если нет присоединения к карте, то нужно проверить размер контейнеров Tez относительно контейнеров YARN. Он должен быть кратным. Например, если контейнеры YARN установлены на 2 ГБ, установите размер контейнера Tez на 4 ГБ. Затем выполните команду EXPLAIN, чтобы просмотреть план выполнения запроса и убедитесь, что мы получаем map joins вместо shuffle joins. Также важно, что если контейнеры Tez слишком большие, то эта память простаивает.

Избегайте глобальной сортировки

Глобальная сортировка реализуется условием ORDER BY. Однако такой запрос рассчитывается неэффективно, так как для расчета устанавливается один редьюсер. Если глобальной сортировки не требуется, лучше использовать условие SORT BY, которое рассчитает сортированный результат для каждого редьюсера. Если при этом нам нужно знать, какой редьюсер рассчитывал конкретную строку, то мы можем использовать DISTRIBUTE BY. Например:

SELECT id, name, salary, dept FROM employee
DISTRIBUTE BY dept
SORT BY id ASC, name DESC;

Здесь dept будет обрабатываться отдельным редуктором, а записи будут сортироваться по полям id и name для каждого dept тоже отдельно.

Оптимизируем оператор LIMIT

По умолчанию этот оператор сначала выполняет запрос, а потом выдает заданное количество строк. Понятно, что такое поведение избыточно. Чтобы избежать этого, можно сделать такие настройки:

hive.limit.optimize.enable = true
-- следует ли попробовать взять сначала небольшой набор данных и на нем выполнить простой limit

hive.limit.row.max.size = 100000
-- какой размер данных нам брать в качестве "небольшого"

hive.limit.optimize.limit.file = 10
-- наибольшее количество файлов, которое мы при этом используем

hive.limit.optimize.fetch.max = 50000
-- максимальное количество строк нашего небольшого набора данных для простого limit, если это запрос выборки. Для запросов insert это ограничение не срабатывает.

Про параллельность

Hive разделяет запрос на одну или несколько стадий. Это могут быть стадии map-reduce, стадии merge, limit или еще что-нибудь. По умолчанию стадии выполняются по очереди. Однако для некоторых частных случаев стадии можно распараллелить. Для этого используются такие настройки:

hive.exec.parallel = true
-- следует ли выполнять параллельно

hive.exec.parallel.thread.number = 8
-- сколько джобов может выполнять параллельно

Про ORC формат файлов

Использование Optimized Record Columnar формата помогает значительно улучшить производительность выполнения запросов в Hive. Достаточно посмотреть на картинку.

Оптимизация JOIN

Для JOIN есть две направления оптимизации — Auto Map Join и Skew Join (данные неровно распределены на кластере).

Auto Map Join особенно полезен, если мы джойним большую таблицу с маленькой. Если мы включим эту опцию, то маленькая таблица сохранится на каждой ноде и join произойдет на стадии map.

hive.auto.convert.join = true
-- Следует ли оптимизировать запрос, конвертируя обычный join в map-join, основываясь на размере входных данных

hive.auto.convert.join.noconditionaltask = true
-- Если этот параметр включен и суммарный размер n-1 таблиц-партиций в n-местном join меньше некого заданного размера, то этот join напрямую конвертируется в mapjoin (безо всяких условных задач).

hive.auto.convert.join.noconditionaltask.size = 10000000
Если hive.auto.convert.join.noconditionaltask=false, то этот параметр не работает. А если true, то это тот самый "некий заданный размер". По умолчанию - 10MB.

hive.auto.convert.join.use.nonstaged = false
Для условных джойнов, если входной поток маленького алиаса может быть применен к оператору джойн без фильтрации и проекций, этот алиас не нужно будет предобрабатывать в распределенном кэше через локальную mapred задачу. Это не будет работать с векторизацией или tezexecution engine.

Небольшой пример условного join:

SELECT a.ID, num, b.ID, N1, N2, N3, N4
FROM dbo.SampleLeftTable a
JOIN dbo.ConditionalJoinExample b
ON num = CASE
            WHEN num BETWEEN 1 AND 25 THEN b.N1
            WHEN num BETWEEN 26 AND 50 THEN b.N2
            WHEN num BETWEEN 51 AND 75 THEN b.N3
            ELSE b.N4
            END;

Чтобы включить оптимизацию для skew join:

hive.optimize.skewjoin = true
-- Алгоритм следующий: во время выполнения определяются ключи с большой асиметрией. Вместо исполнения таких ключей, они временно сохраняются в hdfs директории. В последующих map-reduce джобах обрабатываются эти асиметричные ключи. Один и тот же ключ не должен быть асиметричен для всех таблиц и тогда последующие map-reduce будут существенно ускорены сравнительно с map-join.

hive.skewjoin.key = 100000
Определим, что такое асиметричный ключ. Если мы видим более чем заданное количество строк с тем же ключом в операторе join, то мы считаем ключ асиметричным.

hive.skewjoin.mapjoin.map.tasks = 10000
Определим количество задач, используемых для последующего map join для асиметричного join. Эта команда используется вместе с hive.skewjoin.mapjoin.min.split, чтобы обеспечить детальный контроль.

hive.skewjoin.mapjoin.min.split = 33554432
Определим количество map задач используемых в дальнейшем для асиметричного join как минимальный размер сплита. Нужно использовать эту настройку вместе с hive.skewjoin.mapjoin.map.tasks

На этом всё 🙂

Добавить комментарий