tencent cloud

Elastic MapReduce

  • Release Notes and Announcements
  • Product Introduction
  • Purchase Guide
    • EMR on CVM Billing Instructions
    • EMR on TKE Billing Instructions
    • EMR Serverless HBase Billing Instructions
    • EMR Serverless TCBase Billing Overview
  • Getting Started
  • EMR on CVM Operation Guide
    • Planning Cluster
    • Administrative rights
    • Configuring Cluster
    • Managing Cluster
    • Managing Service
    • Monitoring and Alarms
    • TCInsight
  • EMR on TKE Operation Guide
  • EMR Serverless HBase Operation Guide
  • EMR Serverless TCBase Operation Guide
  • EMR Development Guide
    • Hadoop Development Guide
    • Spark Development Guide
    • Hbase Development Guide
    • Phoenix on Hbase Development Guide
    • Hive Development Guide
    • Presto Development Guide
    • Sqoop Development Guide
    • Hue Development Guide
    • Oozie Development Guide
    • Flume Development Guide
    • Kerberos Development Guide
    • Knox Development Guide
    • Alluxio Development Guide
    • Kylin Development Guide
    • Livy Development Guide
    • Kyuubi Development Guide
    • Zeppelin Development Guide
    • Hudi Development Guide
    • Superset Development Guide
    • Impala Development Guide
    • Druid Development Guide
    • TensorFlow Development Guide
    • Kudu Development Guide
    • Ranger Development Guide
    • Kafka Development Guide
    • StarRocks Development Guide
    • Flink Development Guide
    • JupyterLab Development Guide
    • MLflow Development Guide
  • Practical Tutorial
    • Practice of EMR on CVM Ops
    • Data Migration
    • Practical Tutorial on Custom Scaling
  • API Documentation
    • History
    • Introduction
    • API Category
    • Making API Requests
    • Cluster Resource Management APIs
    • Cluster Services APIs
    • User Management APIs
    • Information Query APIs
    • Scaling APIs
    • Configuration APIs
    • Other APIs
    • Cluster Lifecycle APIs
    • Serverless HBase APIs
    • YARN Resource Scheduling APIs
    • Data Types
    • Error Codes
  • FAQs
    • EMR on CVM
  • Service Level Agreement
  • Contact Us

Using Spark Python to Analyze Data in COS

Download
포커스 모드
폰트 크기
마지막 업데이트 시간: 2025-01-03 14:50:17
This section describes running a Spark wordcount application in Python.

Development Preparations

This task requires access to COS, so you need to create a bucket in COS first.
Create an EMR cluster. When creating the EMR cluster, you need to select the Spark component on the software configuration page and enable access to COS on the basic configuration page.

Data Preparations

Upload the to-be-processed file to COS first. If the file is in your local storage, upload it directly via the COS console; if it is in the EMR cluster, upload it by running the following Hadoop command:
[hadoop@10 hadoop]$ hadoop fs -put $testfile cosn:// $bucketname/
Here, $testfile is the full path with file name and $bucketname is your bucket name. After the upload is completed, you can check whether the file is available in COS.

Running the Demo

First, log in to any node (preferably a master one) in the EMR cluster. For information about how to log in to EMR, see Logging in to Linux Instance Using Standard Login Method. Here, you can use WebShell to log in. Click Login on the right of the desired CVM instance to go to the login page. The default username is root, and the password is the one you set when creating the EMR cluster. Once your credentials are validated, you can enter the command line interface.
Run the following command on the EMR command-line interface to switch to the Hadoop user and go to the Spark installation directory /usr/local/service/spark:
[root@172 ~]# su hadoop
[hadoop@172 root]$ cd /usr/local/service/spark
Create a Python file named wordcount.py and add the following code:
from __future__ import print_function

import sys
from operator import add
from pyspark.sql import SparkSession

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: wordcount <file>", file=sys.stderr)
exit(-1)

spark = SparkSession\\
.builder\\
.appName("PythonWordCount")\\
.getOrCreate()

sc = spark.sparkContext

lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \\
.map(lambda x: (x, 1)) \\
.reduceByKey(add)

output = counts.collect()
counts.saveAsTextFile(sys.argv[2])

spark.stop()
Submit the task by running the following command:
[hadoop@10 spark]$ ./bin/spark-submit --master yarn ./wordcount.py
cosn://$bucketname/$yourtestfile cosn:// $bucketname/$output
Here, $bucketname is your COS bucket name, $yourtestfile is the full path with test file name in the bucket, and $output is your output folder. If the $output folder already exists before the command is executed, the program will fail.
After the program is running automatically, you can find the output file in the destination bucket:
[hadoop@172 spark]$ hadoop fs -ls cosn:// $bucketname/$output
Found 2 items
-rw-rw-rw- 1 hadoop Hadoop 0 2018-06-29 15:35 cosn:// $bucketname/$output /_SUCCESS
-rw-rw-rw- 1 hadoop Hadoop 2102 2018-06-29 15:34 cosn:// $bucketname/$output /part-00000
You can also look up the the result by running the following command:
[hadoop@172 spark]$ hadoop fs -cat cosn:// $bucketname/$output /part-00000
(u'', 27)
(u'code', 1)
(u'both', 1)
(u'Hadoop', 1)
(u'Bureau', 1)
(u'Department', 1)
You can also output the result to HDFS by changing the output location in the command as follows:
[hadoop@10spark]$ ./bin/spark-submit ./wordcount.py
cosn://$bucketname/$yourtestfile /user/hadoop/$output
Here, /user/hadoop/ is the path in HDFS. If this path does not exist, you can create one.
After the task is completed, you can view the Spark execution log by running the following command:
[hadoop@10 spark]$  /usr/local/service/hadoop/bin/yarn logs -applicationId $yourId
Here, $yourId should be replaced with your task ID, which can be viewed in Yarn's WebUI.

도움말 및 지원

문제 해결에 도움이 되었나요?

피드백