case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

DataprocでPySparkの分散並列処理を行う方法

Dataprocの備忘録です。DataprocでGCSに配置したcsvファイルをDataFrameで読み込み分散並列処理する記事です。

簡単にDataprocを紹介

DataprocはGCP上でSparkやHadoopを実行できる環境を提供します。今回はDataprocを使ってPySparkを実行してみたいと思います。PySparkはRDDとDataframe型で実行できます。Pythonでデータ分析でpandasを使うのでDataframe型は扱いやすいです。Dataprocを使うメリットとしては実行環境だけでなく、GCPプロダクトとの統合やコスト面が優れています。通常SparkやHadoopを実行する場合、csvやテキストを分散ファイルシステムであるHDFSに置く必要が有りますが、GCSに配置し実行できます。サーバに配置しないことでジョブが実行したあと削除等も容易です。またコスト面でもジョブが完了したらクラスタを停止できるので、Pythonの並列処理でPoolを使う場合と比較するとインスタンスの無駄使いを防ぐことができます。DataFlowとの違いはDataprocがバッチ処理であること、GCEのサーバが必要な点です。SparkやHadoopの概要はこの記事が分かりやすいです。
qiita.com

事前準備

まず、GCPのプロジェクトIDと同じバケットを作り、sample2.csvを配置して下さい。
github.com

PySparkを実行

実際にありそうなケースだと特定の関数を並列処理させたい場合だと思います。次にCSVファイルをDataFrameで取り込みます。RDDではなくDataFrameとして扱うことで扱い安くすることができます。SSHでマスターノードにSSHし、Pysparkを実行していきます。

$ pyspark
Python 2.7.13 (default, Sep 26 2018, 18:42:22) 
[GCC 6.3.0 20170516] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/09/24 18:48:41 WARN org.apache.spark.scheduler.FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, c
onfigure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.3
      /_/
Using Python version 2.7.13 (default, Sep 26 2018 18:42:22)
SparkSess

GCSに配置したcsvファイルを読み込みます。

from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("CountUniqueWords")\
        .getOrCreate()
project_id = '[ project id]'

sp_df = spark.read.format("com.databricks.spark.csv").option("header", "true").load("gs://{}/sample2.csv".format(project_id))
sp_df.show(5)

+------+------+--------+----+----+----+----+
|label0|label1|     cnt| _c3| _c4| _c5| _c6|
+------+------+--------+----+----+----+----+
|     0|     a|15920405|null|null|null|null|
|     1|     b|12033592|null|null|null|null|
|     2|     c|  853068|null|null|null|null|
|     2|     d|  608603|null|null|null|null|
|     2|     e|  940785|null|null|null|null|
+------+------+--------+----+----+----+----+
only showing top 5 rows

条件に合致したデータを取得します。この辺はJavaScriptとも似てますね。

 sp_df.filter('label0 > 1').show(5)
+------+------+------+----+----+----+----+
|label0|label1|   cnt| _c3| _c4| _c5| _c6|
+------+------+------+----+----+----+----+
|     2|     c|853068|null|null|null|null|
|     2|     d|608603|null|null|null|null|
|     2|     e|940785|null|null|null|null|
|     2|     f|775549|null|null|null|null|
|     2|     g|346132|null|null|null|null|
+------+------+------+----+----+----+----+
only showing top 5 rows

次はUDFです。実際に並列処理させたい場合このようなケースが多いかと思います。

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

# func
def ParallelAction(x):
  return int(x) * 5

udf = UserDefinedFunction(ParallelAction, IntegerType())
sp_df.filter('label0 > 1').withColumn("label2", udf("label0")).show(5)

+------+------+------+----+----+----+----+------+                               
|label0|label1|   cnt| _c3| _c4| _c5| _c6|label2|
+------+------+------+----+----+----+----+------+
|     2|     c|853068|null|null|null|null|    10|
|     2|     d|608603|null|null|null|null|    10|
|     2|     e|940785|null|null|null|null|    10|
|     2|     f|775549|null|null|null|null|    10|
|     2|     g|346132|null|null|null|null|    10|
+------+------+------+----+----+----+----+------+
only showing top 5 rows

新しく作ったカラム「label2」に分散処理の実行結果が書き込まれています。
スクリプト
sample.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

# project id 
project_id = '[ project id]'

spark = SparkSession\
        .builder\
        .appName("CountUniqueWords")\
        .getOrCreate()

sp_df = spark.read.format("com.databricks.spark.csv").option("header", "true").load("gs://{}/sample2.csv".format(project_id))

# func
def ParallelAction(x):
  return int(x) * 5

udf = UserDefinedFunction(ParallelAction, IntegerType())
sp_df.filter('label0 > 1').withColumn("label2", udf("label0")).show(5)

ジョブの実行。

$ spark-submit sample.py

所感

MLモデルのデータパイプラインを作る上ではDataprocよりDataflowの方がサーバレスでバッチ・ストリーミング処理を読み込み先を変えるだけで良いのでいいと思います。しかし、実行結果をインタラクティブに確認したい場合や作成したCSVを一度サーバ環境に吐き出したい場合は実行結果を即時確認できる点でDataprocはDataflowより良いと思いました。

参考
qiita.com
qiita.com
blog.amedama.jp