hive调优

Set hive.limit.optimize.enable=true;--(默认为false)
Set hive.limit.row.max.size=100000;--(limit最多可以查询多少行,根据需求可以调大)
Set hive.limit.optimize.limit.file=10;--(一个查询可以操作的最多文件数,根据需要适当调大)
Set hive.limit.optimize.fetch.max=50000;--(fetch query,直接select from,能够获取的最大行数)

设置压缩

--设置是否启动map输出的压缩机制,默认为false。在需要减少网络传输的时候,可以设置为true。
set mapreduce.map.output.compress=true;
--Hive的Map-Reduce之间是否进行压缩.压缩编解码器和其他选项由 上面Job中的变量mapreduce.output.fileoutputformat.compress.*确定
set hive.exec.compress.intermediate=true;
--控制是否压缩查询的最终输出(到 local/hdfs 文件或 Hive table)压缩编解码器和其他选项由 上面Job中的变量mapreduce.output.fileoutputformat.compress.*确定
set hive.exec.compress.output=true;

小文件合并

--默认值为true。表示是否合并Map输出文件。
set hive.merge.mapfiles=true;
--默认值为false。表示是否合并Reduce输出文件。
set hive.merge.mapredfiles=true;
--默认值为256000000,单位字节。表示合并文件的大小。
set hive.merge.size.per.task=268435456;
--当输出文件的平均大小小于此设置值时,启动一个独立的map-reduce任务进行文件merge,默认值为16M。
set hive.merge.smallfiles.avgsize=16777216;
--在map开始前进行合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

--ORC可以使用新的HDFS缓存API和ZeroCopy读取器来避免在扫描文件时将额外的数据复制到内存中。
set hive.exec.orc.zerocopy=true;
--在Hive的一些复杂关联查询中,可能同时还包含有group by等能够触发shuffle的操作,有些时候shuffle操作是可以共享的,通过关联优化器选项,可以尽量减少复杂查询中的shuffle,从而提升性能。
set hive.optimize.correlation=true;
-- 并行运行表示同步执行Hive的多个阶段。Hive在执行过程中,将一个查询转化成一个或者多个阶段。某个特定的Job可能包含多个阶段,而这些阶段可能并非完全相互依赖的,也就是可以并行运行的,这样可以使得整个Job的运行时间缩短。您可以通过设置以下参数,控制不同的作业是否可以同时运行。
set hive.exec.parallel=true;
--默认值为8。表示允许同时运行线程的最大值。
hive.exec.parallel.thread.number
-- 设置名称
set mapred.job.name=my_job_{DATE};
-- 动态分区
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;

询语句跳过MapReduce程序

--Fetch task
--您可以通过设置以下参数,在执行查询等语句时,不执行MapReduce程序,以减少等待时间
--默认值为none。参数取值如下:
--none:关闭Fetch task优化。
--在执行语句时,执行MapReduce程序。
--minimal:只在SELECT、FILTER和LIMIT的语句上进行优化。
--more:在minimal的基础上更强大,SELECT不仅仅是查看,还可以单独选择列,FILTER也不再局限于分区字段,同时支持虚拟列(别名)
set hive.fetch.task.conversion=minimal;

--开启向量化,执行查询等语句时,不执行MapReduce程序,以减少等待时间
--默认值为true。开启向量化查询的开关。
set hive.vectorized.execution.enabled=true;
--默认值为true。表示是否启用Reduce任务的向量化执行模式。
set hive.vectorized.execution.reduce.enabled=true;

表连接数据倾斜优化

skewjoin原理

  1. 对于skewjoin.key,在执行job时,将它们存入临时的HDFS目录,其它数据正常执行

  2. 对倾斜数据开启map join操作(多个map并行处理),对非倾斜值采取普通join操作

  3. 将倾斜数据集和非倾斜数据集进行合并Union操作。

--如果大表和大表进行join操作,则可采用skewjoin(倾斜关联)来开启对倾斜数据的优化。
set hive.optimize.skewjoin=true; --默认关闭。
--开启skewin以后,设置多大的数据才会被认为是数据倾斜
set hive.skewjoin.key=100000;
--Union时优化
set hive.optimize.union.remove=true; --默认关闭。
-- 此项配置减少对Union all子查询中间结果的二次读写,可以避免union输出的额外扫描过程,当我们开启了skewjoin时尤其有用,建议同时开启。
set hive.optimize.skewjoin=true;
set hive.optimize.skewjoin.compiletime=true;
set hive.optimize.union.remove=true;
--Map阶段聚合 默认开启
hive.map.aggr=true;
--(用于设定Map端进行聚合操作的条目数)
set hive.groupby.mapaggr.checkinterval=100000;
--当选项设定为true时,生成的查询计划有两个MapReduce任务。在第一个MapReduce中,Map的输出结果集合会随机分布到Reduce中, 每个部分进行聚合操作,并输出结果。这样处理的结果是,相同的Group By Key有可能分发到不同的Reduce中,从而达到负载均衡的目的;第二个MapReduce任务再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key分布到同一个Reduce中),最后完成最终的聚合操作
set hive.groupby.skewindata=true;
-- 开启本地MR,默认值false
set hive.exec.mode.local.auto=false;

map、reduce CPU 内存调优

--默认参数,表示JVM堆内存。	-Xmx2048m
set mapreduce.map.java.opts=-Xmx2048m;
--默认参数,表示整个JVM进程占用的内存,计算方法为堆内存+堆外内存=2048+256。
set mapreduce.map.memory.mb=2304;
--默认参数,表示JVM堆内存。
set mapreduce.reduce.java.opts=-Xmx2048m;
--默认参数,表示整个JVM进程占用的内存,计算方法为堆内存+堆外内存=2048+256。
set mapreduce.reduce.memory.mb=2304;
--每个Map任务可用的最多的CPU Core数目。
set mapreduce.map.cpu.vcores=5;
--每个Reduce任务可用的最多的CPU Core数目。此设置在公平队列是不生效的,通常vCores用于较大的集群,以限制不同用户或应用程序的CPU。
mapreduce.reduce.cpu.vcores=10;

yarn-site.xml

<property>
  <name>mapreduce.map.memory.mb</name>
  <value>3096</value>
</property>

hive-env.sh

export HIVE_METASTORE_HEAPSIZE=4096
export HIVE_SERVER2_HEAPSIZE=4096
export HADOOP_HEAPSIZE=4096

Task数量优化

Map Task数量优化

在分布式计算系统中,决定Map数量的一个因素就是原始数据,在不加干预的情况下,原始数据有多少个块,就可能有多少个起始的Task,因为每个Task对应读取一个块的数据;当然这个也不是绝对的,当文件数量特别多,并且每个文件的大小特别小时,您就可以限制减少初始Map对相应的Task的数量,以减少计算资源的浪费,如果文件数量较少,但是单个文件较大,您可以增加Map的Task的数量,以减小单个Task的压力。

通常,Map Task数量是由mapred.map.tasks、mapred.min.split.size和dfs.block.size决定的。

Hive的文件基本上都是存储在HDFS上,而HDFS上的文件,都是分块的,所以具体的Hive数据文件在HDFS上分多少块,可能对应的是默认Hive起始的Task的数量,使用default_mapper_num参数表示。使用数据总大小除以dfs默认的最大块大小来决定初始默认数据分区数。

相关参数

set mapred.max.split.size=2048000000;
set mapred.min.split.size=1024000000;

a. 初始默认的Map Task数量,具体公式如下。

default_mapper_num = total_size/dfs.block.size

b. 计算Split的size,具体公式如下。

default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))

上述公式中的mapred.min.split.sizemapred.max.split.size,分别为Hive计算的时Split的最小值和最大值。

c. 将数据按照计算出来的size划分为数据块,具体公式如下。

split_num = total_size/default_split_size;

d. 计算的Map Task数量,具体公式如下。

map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))

从上面的过程来看,Task的数量受各个方面的限制,不至于Task的数量太多,也不至于Task的数量太少。如果需要提高Task数量,就要降低mapred.min.split.size的数值,在一定的范围内可以减小default_split_size的数值,从而增加split_num的数量,也可以增大mapred.map.tasks的数量。

重要 Hive on TEZ和Hive on MR使用是有差异的。例如,在Hive中执行一个Query时,可以发现Hive的执行引擎在使用Tez与MR时,两者生成的mapper数量差异较大。主要原因在于Tez中对inputSplit做了grouping操作,可以将多个inputSplit组合成更少的groups,然后为每个group生成一个mapper任务,而不是为每个inputSplit生成一个mapper任务。

Reduce Task数量优化

通过hive.exec.reducers.bytes.per.reducer参数控制单个Reduce处理的字节数。

Reduce的计算方法如下。

reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, hive.exec.reducers.max)。

通过mapred.reduce.tasks参数来设置Reduce Task的数量。

说明 在TEZ引擎模式下,通过命令set hive.tez.auto.reducer.parallelism = true;,TEZ将会根据vertice的输出大小动态预估调整Reduce Task的数量。

同Map一样,启动和初始化Reduce也会消耗时间和资源。另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,并且这些小文件作为下一个任务的输入,则会出现小文件过多的问题。

开启MapJoin

MapJoin优势在于没有shuffle

--是否自动转换为mapjoin
set hive.auto.convert.join = true;
--小表的最大文件大小,默认为25000000,即25M
set hive.mapjoin.smalltable.filesize = 25000000;
--是否将多个mapjoin合并为一个
set hive.auto.convert.join.noconditionaltask = true;
--多个mapjoin转换为1个时,所有小表的文件大小总和的最大值。同时hive.auto.convert.join.noconditionaltask必须为true
set hive.auto.convert.join.noconditionaltask.size = 10000000;

mapjoin实现的方式:
1)在Map-Reduce的驱动程序中使用静态方法DistributedCache.addCacheFile()增加要拷贝的小表文件。 JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
2)在Map类的setup方法中使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。


hive调优
https://www.hechunyu.com/archives/1698221879385
作者
chunyu
发布于
2023年06月13日
许可协议