Dataprocの備忘録です。DataprocでGCSに配置したcsvファイルをDataFrameで読み込み分散並列処理する記事です。
簡単にDataprocを紹介
DataprocはGCP上でSparkやHadoopを実行できる環境を提供します。今回はDataprocを使ってPySparkを実行してみたいと思います。PySparkはRDDとDataframe型で実行できます。Pythonでデータ分析でpandasを使うのでDataframe型は扱いやすいです。Dataprocを使うメリットとしては実行環境だけでなく、GCPプロダクトとの統合やコスト面が優れています。通常SparkやHadoopを実行する場合、csvやテキストを分散ファイルシステムであるHDFSに置く必要が有りますが、GCSに配置し実行できます。サーバに配置しないことでジョブが実行したあと削除等も容易です。またコスト面でもジョブが完了したらクラスタを停止できるので、Pythonの並列処理でPoolを使う場合と比較するとインスタンスの無駄使いを防ぐことができます。DataFlowとの違いはDataprocがバッチ処理であること、GCEのサーバが必要な点です。SparkやHadoopの概要はこの記事が分かりやすいです。
qiita.com
事前準備
まず、GCPのプロジェクトIDと同じバケットを作り、sample2.csvを配置して下さい。
github.com
PySparkを実行
実際にありそうなケースだと特定の関数を並列処理させたい場合だと思います。次にCSVファイルをDataFrameで取り込みます。RDDではなくDataFrameとして扱うことで扱い安くすることができます。SSHでマスターノードにSSHし、Pysparkを実行していきます。
$ pyspark Python 2.7.13 (default, Sep 26 2018, 18:42:22) [GCC 6.3.0 20170516] on linux2 Type "help", "copyright", "credits" or "license" for more information. Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/09/24 18:48:41 WARN org.apache.spark.scheduler.FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, c onfigure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.3 /_/ Using Python version 2.7.13 (default, Sep 26 2018 18:42:22) SparkSess
GCSに配置したcsvファイルを読み込みます。
from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("CountUniqueWords")\ .getOrCreate() project_id = '[ project id]' sp_df = spark.read.format("com.databricks.spark.csv").option("header", "true").load("gs://{}/sample2.csv".format(project_id)) sp_df.show(5) +------+------+--------+----+----+----+----+ |label0|label1| cnt| _c3| _c4| _c5| _c6| +------+------+--------+----+----+----+----+ | 0| a|15920405|null|null|null|null| | 1| b|12033592|null|null|null|null| | 2| c| 853068|null|null|null|null| | 2| d| 608603|null|null|null|null| | 2| e| 940785|null|null|null|null| +------+------+--------+----+----+----+----+ only showing top 5 rows
条件に合致したデータを取得します。この辺はJavaScriptとも似てますね。
sp_df.filter('label0 > 1').show(5) +------+------+------+----+----+----+----+ |label0|label1| cnt| _c3| _c4| _c5| _c6| +------+------+------+----+----+----+----+ | 2| c|853068|null|null|null|null| | 2| d|608603|null|null|null|null| | 2| e|940785|null|null|null|null| | 2| f|775549|null|null|null|null| | 2| g|346132|null|null|null|null| +------+------+------+----+----+----+----+ only showing top 5 rows
次はUDFです。実際に並列処理させたい場合このようなケースが多いかと思います。
from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import * # func def ParallelAction(x): return int(x) * 5 udf = UserDefinedFunction(ParallelAction, IntegerType()) sp_df.filter('label0 > 1').withColumn("label2", udf("label0")).show(5) +------+------+------+----+----+----+----+------+ |label0|label1| cnt| _c3| _c4| _c5| _c6|label2| +------+------+------+----+----+----+----+------+ | 2| c|853068|null|null|null|null| 10| | 2| d|608603|null|null|null|null| 10| | 2| e|940785|null|null|null|null| 10| | 2| f|775549|null|null|null|null| 10| | 2| g|346132|null|null|null|null| 10| +------+------+------+----+----+----+----+------+ only showing top 5 rows
新しく作ったカラム「label2」に分散処理の実行結果が書き込まれています。
スクリプト
sample.py
from pyspark.sql import SparkSession from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import * # project id project_id = '[ project id]' spark = SparkSession\ .builder\ .appName("CountUniqueWords")\ .getOrCreate() sp_df = spark.read.format("com.databricks.spark.csv").option("header", "true").load("gs://{}/sample2.csv".format(project_id)) # func def ParallelAction(x): return int(x) * 5 udf = UserDefinedFunction(ParallelAction, IntegerType()) sp_df.filter('label0 > 1').withColumn("label2", udf("label0")).show(5)
ジョブの実行。
$ spark-submit sample.py