Practices on Dynamic Scheduling of Spark Resources

Last updated: 2020-04-15 10:41:35

PDF

Preparations for Development

Confirm that you have activated Tencent Cloud and created an EMR cluster. When creating the EMR cluster, you need to select the spark_hadoop component on the software configuration page.

Spark is installed in the /usr/local/service/ path (/usr/local/service/spark) in the CVM instance for the EMR cluster.

Copying the JAR Package

You need to copy spark-<version>-yarn-shuffle.jar to the /usr/local/service/hadoop/share/hadoop/yarn/lib directory of all nodes in the cluster.

Method 1. Use the SSH Console

  1. On the left sidebar, select Component Management and click the YARN component to enter the YARN component management page.

  2. On the YARN component management page, select Role Management and confirm the IP of the node where NodeManager resides.

  3. Log in to the nodes where NodeManager resides one by one.

    • You need to log in to any node (preferably a master one) in the EMR cluster. For more information on how to log in to EMR, please see Logging in to Linux Instances. Here, you can log in by using XShell.
    • Use SSH to log in to other nodes where NodeManager resides. The used command is ssh $user@$ip, where $user is the login username, and $ip is the remote server IP (i.e., IP address confirmed in step 2).
    • Verify that the switch is successful.
  4. Search for the path of the spark-<version>-yarn-shuffle.jar file.

  5. Copy spark-<version>-yarn-shuffle.jar to /usr/local/service/hadoop/share/hadoop/yarn/lib.

  6. Log out and switch to other nodes.

Method 2. Use batch deployment script

You need to log in to any node (preferably a master one) in the EMR cluster. For more information on how to log in to EMR, please see Logging in to Linux Instances. Here, you can log in by using XShell.

Write the following Shell script for batch file transfer. When there are many nodes in a cluster, to avoid entering the password for multiple times, you can use sshpass for file transfer. sshpass provides password-free transfer to eliminate your need to enter the password repeatedly; however, the password plaintext is prone to disclosure and can be found with the history command.

  1. Install sshpass for password-free transfer.
    [root@172 ~]# yum install sshpass
    Write the following script:
    #!/bin/bash
    nodes=(ip1 ip2 … ipn) # List of IPs of all nodes in the cluster separated by spaces
    len=${#nodes[@]}
    password=<your password>
    file=" spark-2.3.2-yarn-shuffle.jar "
    source_dir="/usr/local/service/spark/yarn"
    target_dir="/usr/local/service/hadoop/share/hadoop/yarn/lib"
    echo $len
    for node in ${nodes[*]}
    do
      echo $node;
      sshpass -p $password scp "$source_dir/$file"root@$node:"$target_dir";
    done
  2. Transfer files in a non-password-free manner.
    Write the following script:
    #!/bin/bash
    nodes=(ip1 ip2 … ipn) # List of IPs of all nodes in the cluster separated by spaces
    len=${#nodes[@]}
    password=<your password>
    file=" spark-2.3.2-yarn-shuffle.jar "
    source_dir="/usr/local/service/spark/yarn"
    target_dir="/usr/local/service/hadoop/share/hadoop/yarn/lib"
    echo $len
    for node in ${nodes[*]}
    do
        echo $node;
        scp "$source_dir/$file" root@$node:"$target_dir";
    done

Modifying YARN Configuration

  1. On the YARN component management page, select "Configuration Management". Select the configuration file yarn-site.xml and select cluster as the dimension (modifications of configuration items in the cluster dimension will be applied to all nodes in the cluster).
  2. Modify the yarn.nodemanager.aux-services configuration item and add spark_shuffle.
  3. Add the configuration item yarn.nodemanager.aux-services.spark_shuffle.class and set it to org.apache.spark.network.yarn.YarnShuffleService.
  4. Add the configuration item spark.yarn.shuffle.stopOnFailure and set it to false.
  5. Save and distribute the settings. Restart the YARN component for the configuration to take effect.

Modifying Spark Configuration

  1. In Component Management, find the Spark component and select Configure in the Operation column.

  2. Select the configuration file spark-defaults.conf, click Modify Configuration, and create configuration items as shown below:

    Configuration Item Value Remarks
    spark.shuffle.service.enabled true It starts the shuffle service.
    spark.dynamicAllocation.enabled true It starts dynamic resource allocation.
    spark.dynamicAllocation.minExecutors 1 It specifies the minimum number of executors allocated for each application.
    spark.dynamicAllocation.maxExecutors 30 It specifies the maximum number of executors allocated for each application.
    spark.dynamicAllocation.initialExecutors 1 Generally, its value is the same as that of `spark.dynamicAllocation.minExecutors`.
    spark.dynamicAllocation.schedulerBacklogTimeout 1s If there are pending jobs backlogged for more than this duration, new executors will be requested.
    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s If the queue of pending jobs still exists, it will be triggered again once every several seconds. The number of executors requested per round grows exponentially compared to the previous round.
    spark.dynamicAllocation.executorIdleTimeout 60s If an executor has been idle for more than this duration, it will be deleted by the application.
  3. Save and distribute the configuration and restart the component.

Testing Dynamic Scheduling of Spark Resources

1. Resource configuration description of the testing environment

In the testing environment, there are two nodes where NodeManager is deployed, and each node has a 4 CPU cores and 8 GB memory. The total resources of the cluster are 8 CPU cores and 16 GB memory.

2. Testing job description

Test 1

  • In the EMR Console, enter the /usr/local/service/spark directory, switch to the "hadoop" user, and run spark-submit to submit a job. The data needs to be stored in HDFS.
    [root@172 ~]# cd /usr/local/service/spark/
    [root@172 spark]# su hadoop
    [hadoop@172 spark]$  hadoop fs -put ./README.md /
    [hadoop@172 spark]$ spark-submit --class org.apache.spark.examples.JavaWordCount --master yarn-client --num-executors 10 --driver-memory 4g --executor-memory 4g --executor-cores 2 ./examples/jars/spark-examples_2.11-2.3.2.jar /README.md /output
  • In the "Application" panel of the WebUI of the YARN component, you can view the container and CPU allocation before and after the configuration.
  • Before dynamic resource scheduling is configured, at most 3 CPUs can be allocated.
  • After dynamic resource scheduling is configured, up to 5 CPUs can be allocated.

Conclusion: after dynamic resource scheduling is configured, the scheduler will allocate more resources based on the real-time needs of applications.

Test 2

  • In the EMR Console, enter the /usr/local/service/spark directory, switch to the "hadoop" user, and run spark-sql to start the interactive SparkSQL Console, which is set to use most of the resources in the testing cluster. Configure dynamic resource scheduling and check resource allocation before and after the configuration.
    [root@172 ~]# cd /usr/local/service/spark/
    [root@172 spark]# su hadoop
    [hadoop@172 spark]$ spark-sql --master yarn-client --num-executors 5 --driver-memory 4g --executor-memory 2g --executor-cores 1
  • Use the example for calculating the pi that comes with Spark 2.3.0 as the testing job. When submitting the job, set the number of executors to 4, the driver memory to 4 GB, the executor memory to 4 GB, and the number of executor cores to 2.
    [root@172 ~]# cd /usr/local/service/spark/
    [root@172 spark]# su hadoop
    [hadoop@172 spark]$ spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 5 --driver-memory 4g --executor-memory 4g --executor-cores 2 examples/jars/spark-examples_2.11-2.3.2.jar 500
  • The resource utilization when only the SparkSQL job is running is 90.3%.
  • After the SparkPi job is submitted, the resource utilization of SparkSQL becomes 27.8%.

Conclusion: although the SparkSQL job applies for a large amount of resources during submission, no analysis jobs are executed; therefore, there are a lot of idle resources actually. When the idle duration exceeds the limit set by spark.dynamicAllocation.executorIdleTimeout, idle executors will be released, and other jobs will get resources. In this test, the cluster resource utilization of the SparkSQL job decreases from 90% to 28%, and idle resources are allocated to the pi calculation job; therefore, automatic scheduling is effective.

The value of the configuration item spark.dynamicAllocation.executorIdleTimeout affects the speed of dynamic resource scheduling. In the test, it is found that the resource scheduling duration is basically the same as this value. You are recommended to adjust this value based on your actual needs for optimal performance.