Hadoop Cluster Configuration Tips
Planning
Plan to use a centralized configuration management framework once your cluster approaches five nodes. Your approach should include both tools and processes for version control, copying files, and keeping cluster configurations in sync. Use monitoring tools such as Nagios to track the performance and health of systems throughout the cluster.
Network Setup
The network setup is crucial for a working Hadoop configuration. Don't configure Hadoop with IP addresses for each item in the cluster (jobtracker, namenode, slaves) - Hadoop looks up the DNS number for the IP address and use those values. Setting up the correct /etc/hosts
files is important. Each node in the network should be able to convert its own IP into the correct DNS and also should be able to resolve the DNS of every other node to its correct IP address.
Another problematic case is the internal vs. external network interface. This can be configured with the parameters mapred.tasktracker.dns.interface
and dfs.datanode.dns.interface
.
Disabling Ipv6 can resolve problems and improve throughput (-Djava.net.preferIPv4Stack=tru
e for HADOOP_OPTS).
Directories and Partitioning
This guide assumes the following good foundation: N>2 discs, 1 system partition mirrored to at least 2 disc, N data partitions.
- Configure the big data (
dfs.data.dir, mapred.local.dir
) to use all N data partitions. - Reserve enough space (
dfs.datanode.du.reserved
) for the temporary data of the biggest Hadoop job to expect. - Log files and the general tmp folder should remain on the system partition since they remain small and in case one disc fails the system is still fully functional.
- The directories for the FileSystem-Image data should also remain on the system partition.
Backup
Critical to backup is the FileSystem-Image data managed by the namenode (dfs.name.dir : latest image + edit logs) and by the secondary namenode (fs.checkpoint.dir : latest image). There are various possibilities:
- Have the data on multiple disks (on the master host). Either by having a mirrored system partition or by specifying multiple directories separated by commas.
- Start the sec-namenode on a second machine (so you have the latest image on two hosts and the edit logs on one host).
- Write one or both directories to NFS-share as well ( Caution:
dfs.name.dir
pointing on a NFS-Share can slow down the namenode). - Have an external backup mechanism which grabs the directories periodically.
Replication
The replication factor of the data has influence in multiple dimensions:
- Greater replication means less free disk space.
- Greater replication provides a greater chance of data locality and better performance.
- Greater replication means better data reliability.
Compression
- BLOCK compression can have huge benefits (both space and performance wise)
- Compression must be configured (mapred.compress.map.output, mapred.output.compress)
- Compression has different codecs, ZLib by default (native as well)
- LZO is not as good at compression as GZip but it is a lot faster
- Note that it uses a GPL license
Changing DFS Block Size
- DFS block size is defaulted in Datameer to 64MB
- Open the Datameer application
- Click on the Administration tab at the top of the screen.
- Select Hadoop Cluster from the navigation bar on the left and Hadoop Cluster in the mode settings.
- In the custom properties box type in the new block size
dfs.block.size=[size]
The block size must be an integer and can't be a string value.
Example: (134217728 = 128mb)
dfs.block.size=134217728
Memory and Task Calculation
Setting the task count is essential for a good functioning cluster. Together, mapred.tasktracker.map.tasks.maximum
and mapred.tasktracker.reduce.tasks.maximum
specify how many tasks can run on one node at a time. The optimal numbers depends on various factors:
- Available RAM
- tasktracker and datanode take each 1GB of RAM per default
- For each task calculate mapred.child.java.opts (200MB per default) of RAM
- Number of cores and average % of used CPU per task
- Per core one task
- If you know that your tasks usually occupy only 75% of the CPU, the number of tasks can be greater then the number of cores
- Example: 4 cores, jobs use ~75% of CPU, free slots= 5, maxMapTasks=3, maxReduceTasks=2
Resource Allocation
Factors in resource allocation include CPU power, disk capacity, network speed, and which other processes are sharing a node. As you set up your system, record values, observe performance, make adjustments, and track changes. Are they improving performance or not? In a shared environment Hadoop can't effectively determine how to allocate data blocks or where to schedule tasks. Consider the network architecture and avoid spreading the Hadoop cluster across a data center.
If you see excessive swapping, long-running tasks, or out of memory failures, you need to change how you allocate resources.
Here are a few tips:
- If you have a CPU-intensive MapReduce application, start with higher values and adjust down as resources are maxed out.
- If you have an I/O intensive application, start with lower values and adjust up.
- Increasing DataNode and TaskTracker settings has an adverse impact on RAM available to individual MapReduce tasks. On large hardware, they can be set high. In general, unless you have several dozen more more nodes working together, using higher settings wastes system resources like RAM that could be better applied to running your mapper and reducer code.
- The operations between Map and Reduce phases of a job can cause a lot of network traffic. Plan for it accordingly if you are using a shared network.
- Set up the Hadoop cluster with dedicated hardware if possible. For example, set up a dedicated switch or set of switches and a dedicated rack or set of racks to optimize performance when nodes communicate with each other.
- If you have processes that open many files, you may want to set the ulimit higher than the default. You can try using 16384 and adjust as you monitor performance. Use the command
# ulimit -n 16384
. You also set the ulimit for the hadoop user in /etc/security/limits.conf; this sets the value persistently. Enter the valuehadoop hard nofile 16384.
- If you have lots of RAM available, set io.sort.factor to 25 or 32 (up from 10). io.sort.mb should be 10 * io.sort.factor. Remember to multiply io.sort.mb by the number of concurrent tasks to determine how much RAM you’re allocating here, to prevent swapping. (So 10 task instances with io.sort.mb = 320 means you’re allocating 3.2 GB of RAM for sorting, up from 1.0 GB.)
- Try setting
io.file.buffer.size
– to 65536. - If the NameNode and JobTracker are on big hardware, try setting both
dfs.namenode.handler.count and
mapred.job.tracker.handler.count
to 64.
Job Scheduling
When not configured, Hadoop uses FIFO job scheduling. When a bunch of jobs are submitted, the first job uses all resources (map- and reduce-slots) which are available and the subsequent jobs have to wait on freed resources. Luckily Hadoop's scheduler is pluggable and there are two major implementations. The FairScheduler and the Capacity Scheduler bring such features as:
- Share resources between queues--jobs can be submitted to specific queues which can have a specific resource share
- Share resources between users
- Priority jobs
Log Files
- Spend extra time at first getting to know the common patterns in your log files so you know where to look to find out if hardware is having a problem or why a task has taken longer than usual to run.
- Hadoop Daemon Logs: File names use the pattern
hadoop-<user-running-hadoop>- <daemon> -<hostname>.log
In them, look for cluster-specific errors and warnings that might have to do with daemons running incorrectly. - Job Configuration XML: These logs are stored in
:/var/log/hadoop
and/var/log/hadoop/history
. The XML file describes the job configuration. The/hadoop
file names are constructed as follows:job_<job_ID>_conf.xml.
These files might of interest to developers rather than system administrators, because their contents are job-specific. You can clear these logs periodically without affecting Hadoop. However, don't move or delete a file that is currently being written to by a running job. - Job Statistics: Files are named:
<hostname><epoch-of-jobtracker-start(in milliseconds)><job-id>_<job-name>
You can clear these logs periodically without affecting Hadoop. However, don't move or delete a file that is currently being written to by a running job. - Standard Error (for a particular task attempt) These logs are stored in:
/var/log/hadoop/userlogs/attempt_<job-id><map-or-reduce><attempt-id>.
They contain information written to standard error (stderr) captured when a task attempt is run. These logs can be used for debugging. For example, a developer can includeSystem.err.println
calls in the job code. The output appears in the standard error files. Note that <map-or-reduce> is either “m” if the task attempt was a mapper, or “r” if the task attempt was a reducer. You can clear these logs periodically without affecting Hadoop. However, don't move or delete a file that is currently being written to by a running job. - Standard Out (for a particular task attempt)
- log4j informational messages from within the task process containing Hadoop internal diagnostic information. You can clear these logs periodically without affecting Hadoop. However, don't move or delete a file that is currently being written to by a running job.
The Hadoop components produce the following logs when installing Hadoop from CDH RPM or DEB packages:
- On the job tracker:
/var/log/hadoop
/hadoop-*
= daemon logs
/job_*.xml
= job configuration XML logs
/history
/*_conf.xml
= job configuration logs
< everything else > = job statistics logs
- On the namenode:
/var/log/hadoop
/hadoop-*
> daemon logs
- On the secondary namenode:
/var/log/hadoop
/hadoop-*
> daemon logs
- On the datanode:
/var/log/hadoop
/hadoop-*
> daemon logs
- On the tasktracker:
/var/log/hadoop
/hadoop-*
> daemon logs
/userlogs
/attempt_*
/stderr
> standard error logs
/stdout
> standard out logs
/syslog
> log4j logs
Tools, tutorials, and extra features
See: Hadoop Tutorials and Extra Features for a list of tutorial links and an introduction to extra features that can be enabled through configuration.
Other tips:
- Don't set up
$HADOOP_HOME
to be a NFS mount. This causes a denial of service attack on the NFS server. - Make sure the domain name service (DNS) in the cluster is configured correctly.
- RAID is often turned on by default although it isn't recommended for use with Hadoop. You might need to turn it off.
Required Configuration
hadoop-site.xml
templates
conf/core-site.xml
Name | Value | Reasoning |
---|---|---|
| hdfs://<masterhost>:9000 |
|
| <comma-separated list of local folders where the secondary namenode stores its images> |
|
| <local folder where Hadoop deamons store it temporary and based on your default settings some permanent data> |
|
conf/mapred-site.xml
Name | Value | Reasoning |
---|---|---|
| <masterhost>:9001 |
|
| <how many map task per task-tracker concurrently> |
|
| <how many reduce task per task-tracker concurrently> |
|
| -Xmx500m | Datameer needs a minimum of 500 MB per task jvm with its default configuration. Interacts with |
| <comma-separated list of all folders where the big temporary data should go.> |
|
| <path in hdfs where small control files are stored> |
|
conf/hdfs-site.xml
Name | Value | Reasoning |
---|---|---|
| <comma-separated list of all folders where the hdfs data should go.> |
|
| <comma-seperated list of local folders where the namenode stores it nametable(fsimage) to> |
|
|
|
Optional Configuration
Name | Value | Default | Influence | Location |
---|---|---|---|---|
| 3 | Greater replication factor means less disc space but greater data reliability and availability (and a lesser value means the opposite) |
| |
| number of minutes deleted files are kept in trash | 0 - trash feature is disabled | Provides a rescue time for erroneous deletes |
|
| Boolean | true | Slow tasks are re-executed in parallel |
|
| (eth0, eth1,...) | default (first one) | Use internal instead of external interface |
|
| (eth0, eth1,...) | default (first one) | Use internal instead of external interface |
|
| true/false | true | Enables/disables unix-like permissions on hdfs. Enabling the permissions does usually make things harder to work with while its bringing limited advantages (its not so much for securing things but for prohibiting users to mistakenly mess up others users data ) |
|
Tuning Configuration
Name | Recommended value | Default | Influence | Location |
---|---|---|---|---|
| (-1 is unlimited) | 1 | When the task jvn is reused between tasks of a job, task-preparation time is saved. |
|
| >100 (dependent on available memory) | 100 | amount of buffer memory for sorting in MB |
|
| 50 (for io.sort.mb=100) | 10 | number of merge streams for sorting (highly dependent on io.sort.mb, each thread has io.sort.mb/io.sort.factor memory available ) |
|
| 64MB, 128MB, 512MB | 64MB |
|
|
| 4096 | 256 |
|
|
| 65536 | 4096 |
|
|
Datameer-Specific Configuration
Name | Recommended Value | Description | Location |
---|---|---|---|
| false | Datameer currently doesn't support speculative execution. However you don't need to configure these properties cluster-wide since Datameer disables speculative execution for every job it submits. You only need to make sure that these properties aren't set cluster-wide to true with the final parameter set to true as well, not allowing a client to change that property on a job basis anymore. |
|
| usually one of: | Datameer cares about the what and the when of compression but not about the how, means the codec. It uses the codec you configured for the cluster. If you've configured a non-standard codec like LZO, you have to make sure that Datameer has access to the codec as well. See Frequently Asked Hadoop Questions#Q. How do I configure Datameer/Hadoop to use native compression? |
|