MapReduce过程中shuffle数据拉取失败(error in shuffle in fetcher)

MapReduce过程中shuffle数据拉取失败。

shuffle

问题

在运行MapReduce时,ETL反映了如下的错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#38
at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:56)
at org.apache.hadoop.io.BoundedByteArrayOutputStream.<init>(BoundedByteArrayOutputStream.java:46)
at org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput.<init>(InMemoryMapOutput.java:63)
at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.unconditionalReserve(MergeManagerImpl.java:305)
at org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.reserve(MergeManagerImpl.java:295)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyMapOutput(Fetcher.java:514)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.copyFromHost(Fetcher.java:336)
at org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:193)


资料

过程分析

下文引用自该博客

根据《Hadoop:The Definitive Guide 4th Edition》所述(P203-219),map任务和reduce任务之间要经过一个shuffle过程,该过程复制map任务的输出作为reduce任务的输入
具体的来说,shuffle过程的输入是:map任务的输出文件,它的输出接收者是:运行reduce任务的机子上的内存buffer,并且shuffle过程以并行方式运行
参数mapreduce.reduce.shuffle.input.buffer.percent控制运行reduce任务的机子上多少比例的内存用作上述buffer(默认值为0.70),参数mapreduce.reduce.shuffle.parallelcopies控制shuffle过程的并行度(默认值为5)
那么”mapreduce.reduce.shuffle.input.buffer.percent” * “mapreduce.reduce.shuffle.parallelcopies” 必须小于等于1,否则就会出现如上错误
因此,我将mapreduce.reduce.shuffle.input.buffer.percent设置成值为0.1,就可以正常运行了(设置成0.2,还是会抛同样的错)

另外,可以发现如果使用两个参数的默认值,那么两者乘积为3.5,大大大于1了,为什么没有经常抛出以上的错误呢?

  • 首先,把默认值设为比较大,主要是基于性能考虑,将它们设为比较大,可以大大加快从map复制数据的速度
  • 其次,要抛出如上异常,还需满足另外一个条件,就是map任务的数据一下子准备好了等待shuffle去复制,在这种情况下,就会导致shuffle过程的“线程数量”和“内存buffer使用量”都是满负荷的值,自然就造成了内存不足的错误;而如果map任务的数据是断断续续完成的,那么没有一个时刻shuffle过程的“线程数量”和“内存buffer使用量”是满负荷值的,自然也就不会抛出如上错误
  • 另外,如果在设置以上参数后,还是出现错误,那么有可能是运行Reduce任务的进程的内存总量不足,可以通过mapred.child.java.opts参数来调节,比如设置mapred.child.java.opts=-Xmx2024m

reduce会在map执行到一定比例启动多个fetch线程去拉取map的输出结果,放到reduce的内存、磁盘中,然后进行merge。当数据量大时,拉取到内存的数据就会引起OOM,所以此时要减少fetch占内存的百分比,将fetch的数据直接放在磁盘上。

mapreduce.reduce.shuffle.memory.limit.percent:每个fetch取到的map输出的大小能够占的内存比的大小。默认是0.25。因此实际每个fetcher的输出能放在内存的大小是reducer的java heap size*0.9*0.25。

源码分析

下文引用自该博客

结合hadoop 2.2.0的源代码来对整个失败过程进行简要分析。
从代码分析来看,最底层Fetcher.run方法执行时出现的错误,在Shuffle.run方法中,会启动一定数量的Fetcher线程(数量由参数mapreduce.reduce.shuffle.parallelcopies决定,我们配置的事50个,是不是有点多,默认是5),Fetcher线程用来从map端copy数据到Reducer端本地。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];  
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}

// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();

synchronized (this) {
if (throwable != ) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}

当任意一个Fetcher发生异常时,就会在scheduler的等待后能够在主线程发现,停掉整个Reducer。

1
2
3
4
5
6
7
8
9
10
11
public synchronized void reportException(Throwable t) {  
if (throwable == ) {
throwable = t;
throwingThreadName = Thread.currentThread().getName();
// Notify the scheduler so that the reporting thread finds the
// exception immediately.
synchronized (scheduler) {
scheduler.notifyAll();
}
}
}

在异常堆栈发生的地方,Fetcher中调用copyFromHost方法,调用到Fetcher的114行,merger.reserve方法会调用MergerManagerImpl.reserve

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override  
public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
long requestedSize,
int fetcher
) throws IOException {
if (!canShuffleToMemory(requestedSize)) {
LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
" is greater than maxSingleShuffleLimit (" +
maxSingleShuffleLimit + ")");
return new OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,
jobConf, mapOutputFile, fetcher, true);
}
...
}

重点是这个canShuffleToMemory方法,它会决定是启动OnDiskMapOutput还是InMemoryMapOutput类,标准就是需要的内存数量小于设置的限制。

1
2
3
private boolean canShuffleToMemory(long requestedSize) {  
return (requestedSize < maxSingleShuffleLimit);
}

在初始化MergerManageImpl的时候设置了这个限制,MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES(mapreduce.reduce.memory.totalbytes)这个参数我们并没有设置,
因此使用的是Runtime.getRuntime.maxMemory()*maxInMemCopyUse, MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT(mapreduce.reduce.shuffle.input.buffer.percent)
参数使用的是0.70,也就是最大内存的70%用于做Shuffle/Merge,比如当前Reducer端内存设置成2G,那么就会有1.4G内存。

1
2
3
4
5
6
7
8
9
10
11
final float maxInMemCopyUse =  
jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f);
this.memoryLimit =
(long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
* maxInMemCopyUse);
final float singleShuffleMemoryLimitPercent =
jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
this.maxSingleShuffleLimit =
(long)(memoryLimit * singleShuffleMemoryLimitPercent);

而单个Shuffle最大能够使用多少内存,还需要再乘一个参数:MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT(mapreduce.reduce.shuffle.memory.limit.percent),
我们当前并没有设置这个参数,那么默认值为0.25f,此时单个Shuffle最大能够使用1.4G*0.25f=350M内存。
InMemory会在初始化时接收一个size参数,这个size的计算方式暂时未知,用于初始化其BoundedByteArrayOutputStream,

1
2
3
4
5
6
public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,  
MergeManagerImpl<K, V> merger,
int size, CompressionCodec codec,
boolean primaryMapOutput) {
...
}

这个size也就是BoundedByteArrayOutputStream作为byte[]的大小:

1
2
3
public BoundedByteArrayOutputStream(int capacity, int limit) {  
this(new byte[capacity], 0, limit);
}

OOM也就是出现在这一行。
而我们出的错可能就是出现在判定为使用InMemoryMapOutput但是分配内存时出现的错误,试想使用50个Fetcher线程,单个线程设置为最大接收350M,而堆的最大内存为2G,这样只要有7个Fetcher线程判断为使用InMemoryMapOutput,且同时开始接收数据,就可能造成Java Heap的OOM错误,从而导致Shuffle Error。
我觉得我们可以对使用的参数进行一定的调整,比如说减少Fetcher线程的数量,减少单个Shuffle使用InMemory操作的比例让其OnDisk操作等等,来避免这个问题。


解决

情况

集群指定的相关参数如下:

  • mapred.child.java.opts=-Xmx1024m
  • mapreduce.reduce.memory.mb=3072
  • mapreduce.reduce.shuffle.parallelcopies=50

mapreduce.reduce.shuffle.input.buffer.percent未指定,默认值为0.7
mapreduce.reduce.shuffle.memory.limit.percent未指定,默认值为0.25
因此实际单个线程最大内存为179.2mb,由于线程数量限制在50,当第18个线程开启的时候就会报OOM了。

措施

  • 限制shuffle阶段的内存使用,让fetch的数据尽量放置在磁盘上
    • hive:set mapreduce.reduce.shuffle.memory.limit.percent=0.1;
    • MR:job.getConfiguration().setStrings(“mapreduce.reduce.shuffle.memory.limit.percent”, “0.1”);
  • 增加JVM的heap memory
  • 减少允许的最大线程数量mapreduce.reduce.shuffle.parallelcopies=50
如果文章对您有帮助,感谢您的赞助支持!