tencent cloud

文档反馈

管理 Spark 作业

最后更新时间:2023-12-27 15:05:35

    功能介绍

    EMR 容器版集群支持在控制台提交 Spark 作业,查看作业信息。
    注意
    提交作业的文件格式为 yaml,大小10M 以内。

    操作步骤

    1. 登录 EMR 容器版控制台,在集群列表中单击对应的集群 ID/名称进入集群详情页。
    2. 在集群详情页中单击作业管理,即可进行相关作业提交和查询。
    3. EMR 支持 CRD 方式提交作业,用户编写 yaml 作业文件后,由控制台进行作业提交。
    4. 单击作业列表上方的提交作业,打开提交作业弹框。选择需要提交的作业文件,单击确认后,即可提交作业。
    
    5. 单击作业列表中的详情,可跳转 Spark Historyserver UI 链接查看作业详情。
    6. 单击作业列表中的删除,打开删除弹框。在删除作业页面,确认需要删除的作业信息,确认无误后,单击确定,即可删除作业。

    作业示例

    通过 CRD 方式提交 Spark 作业的一般流程如下:
    1. 编写 Spark 程序。
    2. 编译打成 jar 包,将 jar 包放到 cos 或者 hdfs 等文件系统,或编写 Dockerfile 将 Spark 程序 jar 包打到镜像。
    3. 编写 yaml 文件,在控制台进行提交。
    下面将给出4种 Spark 作业示例:

    示例1. 使用 Spark 官方 jar 包

    新建 yaml 作业文件示例内容如下:
    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: test1
    spec:
      hadoopConf:
        "fs.cosn.userinfo.secretId":"$SecretId"
        "fs.cosn.userinfo.secretKey":"$SecretKey"
      type: Scala
      mode: cluster
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.2.0.jar"
    本文示例中的参数描述,请参见 sparkoperator 。其中:
    apiVersion 和 kind 为 k8s 中资源种类和版本,此处不能更改。
    Metadata.name 定义作业名称,本文以 test1为例,用户可以自定义。
    Spec.hadoopConf 定义与 hadoop 相关配置信息,与 cos 交互需要配置密钥信息,可通过 API 密钥管理 获取密钥信息,代码中的 $SecretId$Secretkey 需要替换成业务对应的 SecretId 和 Secretkey。
    type 定义 spark 的程序种类,包括 Java、Scala、Python、R。本文以 Scala 为例,您可根据需要选填。
    mode 定义 sparkApplication 的部署模式,包括 cluster 和 client。本文以 cluster 为例,您可根据需要选填。
    driver 和 executor 分别定义 spark 的驱动器和执行器,由后台自动生成,其默认参数如下:
    driver:
      cores: 1
      memory: 512m
    executor:
      cores: 1
      instances: 2
      memory: 512m
    
    用户可以自定义 driver 和 executor 参数,补充在 yaml 作业文件示例1中,此时自定义参数会覆盖默认参数,示例如下:
    driver:
        cores: 1
        coreLimit: "1200m"
        memory: "512m"
      executor:
        cores: 1
        instances: 1
        memory: "512m"

    示例2. 自行编译打包并将 jar 包放到 cos(推荐)

    以下示例演示用户编写 spark 程序,打 jar 包,编写 yaml 作业进行提交的完整流程。
    1. 开发准备 因为任务中需要访问腾讯云对象存储(COS),所以需要在 COS 中存在一个存储桶(Bucket),该存储桶可以是创建集群时的存储桶,也可以创建新的存储桶(需与创建集群时选择的存储桶位于同一地域)。
    2. 使用 Maven 创建工程 在本次演示中,自己建立工程编译打包之后上传。推荐您使用 Maven 来管理您的工程。Maven 是一个项目管理工具,能够帮助您方便的管理项目的依赖信息。
    3. 编写 wordcount 程序,添加样例代码:
    import java.util.Arrays;
    import java.util.regex.Pattern;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.SparkSession;
    import scala.Tuple2;
    
    public class WordCountOnCos {
        private static final Pattern SPACE = Pattern.compile(" ");
        public static void main(String[] args){
            if (args.length < 1) {
                System.err.println("Usage: JavaWordCount <file>");
                System.exit(1);
            }
            SparkSession spark = SparkSession.builder().appName("wordCountOnCos").getOrCreate();
            JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
            JavaRDD<String> words = lines.flatMap(s -> Arrays.<String>asList(SPACE.split(s)).iterator());
            JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2(s, Integer.valueOf(1)));
            JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> Integer.valueOf(i1.intValue() + i2.intValue()));
            counts.saveAsTextFile(args[1]);
            spark.stop();
        }
    }
    4. 执行 mvn package 命令对整个工程进行打包。
    5. 将 jar 包上传到 cos 桶,编写 yaml 文件如下:
    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: test2
    spec:
      hadoopConf:
        "fs.cosn.userinfo.secretId":"$SecretId"
        "fs.cosn.userinfo.secretKey":"$SecretKey"
      type: Java
      mode: cluster
      mainClass: com.tencent.WordCountOnCos
      mainApplicationFile: "cosn://kt-test-251007880/sparkapp/jar/wordcount.jar"
      arguments:
        - "cosn://kt-test-251007880/sparkapp/input/input"
        - "cosn://kt-test-251007880/sparkapp/output"
    
    其中 arguments 是传递给主类的参数,本例中表示 wordcount 程序的输入和输出目录。本文中 mainApplicationFile 和 wordcount 程序的输入和输出目录为示例,用户可以自定义。

    示例3. 自行编译打包并将 jar 包放到 HDFS

    用户编写 spark 程序,打 jar 包同示例2,然后将 jar 包上传到 HDFS,编写 yaml 文件如下:
    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: test3
    spec:
      hadoopConf:
        "fs.cosn.userinfo.secretId":"$SecretId"
        "fs.cosn.userinfo.secretKey":"$SecretKey"
      type: Java
      mode: cluster
      mainClass: com.tencent.WordCountOnCos
      mainApplicationFile: "hdfs://$ip:$port/sparkapp/jar/wordcount.jar"
      arguments:
        - "cosn://kt-test-251007880/sparkapp/input/input"
        - "hdfs://$ip:$port/sparkapp/output"
    
    注意
    若使用 HFDS 存放 jar 包,HDFS 需要和容器集群位于同一 VPC。

    示例4. 自行编译打包并将 jar 包打到镜像

    用户编写 spark 程序,打 jar 包同示例2,然后建立 Dockerfile 如下:
    FROM ccr.ccs.tencentyun.com/emr-image/spark:BaseImage
    USER root
    RUN mkdir -p /sparkapp
    COPY jars/wordcount.jar /sparkapp
    ENTRYPOINT [ "/opt/entrypoint.sh" ]
    需要继承基础镜像 ccr.ccs.tencentyun.com/emr-image/spark:BaseImage,该镜像包含了与 cos 交互所需要的 jar 包。
    docker build -t ccr.ccs.tencentyun.com/emr-image/spark:wc -f ./bin/Dockerfile .
    编写作业 yaml 如下:
    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: test4
    spec:
      hadoopConf:
        "fs.cosn.userinfo.secretId":"$SecretId"
        "fs.cosn.userinfo.secretKey":"$SecretKey"
      type: Java
      mode: cluster
      mainClass: com.tencent.WordCountOnCos
      image: ccr.ccs.tencentyun.com/emr-image/spark:wc
      mainApplicationFile: "local:///sparkapp/wordcount.jar"
      arguments:
        - "cosn://kt-test-251007880/sparkapp/input/input"
        - "cosn://kt-test-251007880/sparkapp/output"
    其中 image 为您打包的镜像,mainApplicationFile 是 jar 包在镜像中的路径。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持