今回はDataproc クラスタ上でジョブを実行し、GCS内のデータを加工したいと思います。
- 本記事の目的
- 本記事の用途
- 事前準備[ Dataprocクラスタ, GCSバケット ]
- PySparkで簡単なジョブを実行
- Pig ジョブの実行
- 出力ファイルの取得
- GCS内のデータを活用してPySparkジョブを実行
- おわりに
本記事の目的
本記事は環境構築を数分で行いたく、サーバのメモリ容量に依存せず、大規模データの前処理を行いたいデータサイエンティストを対象としています。
データサイエンティストの仕事の大部分は、データの評価や加工など「前処理」です。前処理ができれば、モデルの構築などはPythonのライブラリが行ってくれます。既存データを加工しない基礎分析であればPythonの分析ツールJupyterを活用すればできます。しかし、大規模データを加工したいケース、例えば1億レコード全てに新しくカラムを追加したいときなど、Jupyterサーバ上で行うには、よほど高価なサーバでない限りメモリ不足に陥ります。このようなケースでは大規模データを分散処理フレームワーク[HadoopやSpark]で処理し、バケット[GCSやS3]に格納した後、JupyterやDWH[Treasure dataやGoogle BigQuery]に取り込みます。データサイエンティストの大半はデータを加工するためのPythonの知識はあると思います。Dataprocで簡単に環境構築し、大規模データの前処理を行いたいケースで本記事はお役に立てるのではないかと思います。
本記事の用途
専門用語が飛び交うので、事前に用語について簡単に記載させて頂きます。
Hadoopとは
大規模データを取り扱う分散処理フレームワークのことです。安価なサーバを簡単に増やすことのできるスケールアウト方式かつ、クラスタの各ノードの管理も行ってくれます。大規模データの取り扱いに特化した分散型のフレームワークです。
HDFS(Hadoop File System)とは
分散バッチ処理ソフト「Apache Hadoop」向けのファイルシステムであり、1台のサーバのストレージに収まりきらない大容量のデータを、何台かのサーバに分割して配置し管理するための仕組みとなります。ファイルを分割して複数のディスクで管理します。データを複数のディスクから並行して読み、処理の多重度を上げることで大量データ処理のスループット[単位時間あたりの処理能力]を引き上げます。
Sparkとは
Hadoopよりも高速に処理できる分散処理を行うフレームワークのことです。Hadoopはストレージで管理しているため大規模データを処理するのに時間がかかります。より高速に大規模データを処理するために、ストレージではなくインメモリで高速処理できるようSparkが作られました。インメモリで管理しているためCPUから直接データを読み込むことができます。データベースでいうとストレージに書き込むMySQLがHadoop、メモリ上で管理するRedisがSparkに該当します。
Apache Pigとは
Hadoopでのクエリを実行するための言語で、Hadoop上でデータ操作を行う言語です。同様にHiveがありますが違いとしては手軽に使えて複雑な処理も可能であり柔軟性が高い、ですがHiveに比べるとパフォーマンスは低めです。
ジョブとは
クラスタ上で実行される計算のことです。単純なプログラムの実行もジョブと呼びます。
本記事ではジョブを実行し、前処理を行いたいと思います。
事前準備[ Dataprocクラスタ, GCSバケット ]
ジョブ実行のための事前準備を行います。まず、GCPからDataprocクラスタとGCPバケットを作成して下さい。
Dataprocクラスタ構築
Dataprocのクラスタ構築済みで話を進めさせて頂ければと思いますので、まだの方は事前に構築頂ければと思います。
case-k.hatenablog.com
GCSバケットを作成
ジョブの実行結果を保存するためのバケットを作成したいと思います。
GCPのコンソールを開き、右上からSSHウィンドウを開いて下さい。
以下のコマンドを実行し、GCS内にバケットを作成して下さい。
バケットには、クラスタと同じリージョンにある「プロジェクトID」と同じ名前を付けます
gsutil mb -c regional -l us-central1 gs://$DEVSHELL_PROJECT_ID
ウェブ コンソールのメニューから、「Storage」に移動します。
バケットが作成されたことを確認します。
PySparkで簡単なジョブを実行
まずはPySpark Shellを開き、PytSparkの簡単なジョブを実行したいと思います。
ジョブ実行までの以下となります。
処理フロー
1. 実行ファイルの準備
2. PySpark Shellを開く
3. PySparkジョブの実行
1. 実行ファイルの準備
まずは、gitからクローンし、クローンしたフィアルをGCSにコピーします。
以下のコマンドを実行して下さい。
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd training-data-analyst/courses/unstructured ./replace_and_upload.sh <YOUR-BUCKET-NAME>
2. PySpark Shellを開く
Dataprocクラスタに移動し、「VMインスタンス」をクリックして、クラスタ内のマシンリストを表示して下さい。
「マスターノード」をクリックして詳細を確認後、「SSH」ボタンをクリックしSSHウィンドウを立ち上げて下さい。
コマンドプロンプトより「pyspark」と入力し、PySpark Shellを開きます。
3. PySparkジョブの実行
とても簡単なジョブを実行してみます。
data = [0, 1, 2, 3, 4, 5] print data # Out [0, 1, 2, 3, 4, 5]
Sparkを終了します。
quit()
Pig ジョブの実行
次にDataprocが提供するHDFS クラスタを活用してPigジョブを実行したいと思います。
冒頭に記載しましたが、PigはHadoopのデータ操作を行う言語です。
処理フロー
1. GCSからファイルをHDFSに移動
2. テキストファイルをHDFSにコピー
3. Pigジョブの実行
4. HDFS内でファイルの生成
5. 出力ファイルの取得
実際の運用でありそうなケースですね。GCSに格納されている大規模データを分散処理で処理したいと思います。
1. GCSからファイルをHDFSに移動
まずSSHウィンドウにGCS内のファイルを移動させるディレクトリを作ります。
その後、HDFSに移動させたいと思います
mkdir lab cd lab # データファイルとPigスクリプトを移動します。 gsutil -m cp gs://バケット名/unstructured/pet-details.* .
gsutil コマンドは、Google Cloud Storage (GCS) を操作するためのコマンドラインツールのことです。
2. テキストファイルをHDFSにコピー
hadoop fs -mkdir /pet-details hadoop fs -put pet-details.txt /pet-details
hdfsシェルコマンド
http://www.mwsoft.jp/programming/hadoop/hdfs_shell.html
移動が完了したら、DataprocクラスタのIPアドレスにHDFSのポート「:9870」を付けてHadoopの管理画面を確認してみましょう。
右側の「Utilities」メニューから、「Browse the file system」を選択します。
pet-details という名前のフォルダがあることと、その中に pet-details.txt というファイルがあることを確認します。
これでジョブを実行する準備が整いました。
それではいよいよジョブを実行したいと思います。
3. Pigジョブの実行
準備が整いましたのでSSHウィンドウでPigを実行したいと思います。
pig < pet-details.pig
ジョブが完了したら、再びHadoopの管理画面を開いてみましょう。
GroupedByTypeフォルダが出来上がりました。
出力ファイルの取得
HDFSに出力されたファイルをSSHウィンドウに取得したいと思います。
mkdir ~/lab2/output cd ~/lab2/output hadoop fs -get /GroupedByType/part* . # 出力の確認 cat *
Hadoopで大規模データを加工したい場合、このような方法で行うと良いと思います。
GCS内のデータを活用してPySparkジョブを実行
先ほど、Pigのジョブを実行するため、前準備としてHDFSにGCSから取得したデータをコピーした後、Pigでジョブを実行しました。
このセクションではGCS内のファイルに対して直接、PySparkジョブを実行したいと思います。
実際のユースケースとしては、GCS内の大規模データを加工したい時に活用します。
処理フロー
1. GCSにファイルをコピー
2. PySparkジョブの実行
1. GCSにファイルをコピー
以下のコマンドを実行し、ファイルの取得と取得したデータを作成済みのGCSに保存します。
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd training-data-analyst/courses/unstructured ./replace_and_upload.sh <YOUR-BUCKET-NAME>
無事GCSのバケットにコピーできたか確認してみます。
gsutil ls gs://dataproc-f3c8d810-fd8c-4892-b5d6-c127a70d8636-us/unstructured/
問題なくGCSにファイルをコピーできました。
コンソールから「ストレージ」の「バケット」をクリックいただいても確認することができます。
2. PySparkジョブの実行
それではPySparkファイルを実行し、GCS内でデータを加工してみましょう。
事前にPySparkのスクリプトを作成し実行します。
Dataprocクラスタにはデータもコピーせずに出力結果のみ表示させます。まず、今回対処となるデータを確認したいと思います。
バケットにコピーした「lab2-input.txt 」を確認してみて下さい。カンマ区切りのキーと値のリストが確認できるかと思います。
# lab2-input.txt
Dog,Noir
Dog,Bree
Dog,Pickles
Dog,Sparky
Cat,Tom
Cat,Alley
Cat,Cleo
Frog,Kermit
Pig,Bacon
Pig,Babe
Dog,Gigi
Cat,George
Frog,Hoppy
Pig,Tasty
Dog,Fred
Cat,Suzy
PySparkジョブの実行ファイル、「lab2.py」も確認してみたいと思います。
from pyspark import SparkContext sc = SparkContext("local") file = sc.textFile("gs://dataproc-f3c8d810-fd8c-4892-b5d6-c127a70d8636-us/unstructured/lab2-input.txt") dataLines = file.map(lambda s: s.split(",")).map(lambda x : (x[0], [x[1]])) print dataLines.take(100) databyKey = dataLines.reduceByKey(lambda a, b: a + b) print databyKey.take(100) countByKey = databyKey.map(lambda (k,v): (k, len(v))) print countByKey.take(100)
これらのファイルはGCS上に格納されており、クラスタ上にはありません。
ウェブコンソールから「Dataproc」に移動し、今回は「ジョブ」を選択し、「ジョブ送信」をクリックして下さい。
ジョブ設定画面で以下の設定をして下さい。
・ジョブタイプ:PySpark
・メインの Python ファイル:gs://
ジョブの設定が完了したら設定画面したの「送信」をクリックして下さい。
ジョブが完了したらジョブIDをクリックして詳細を確認してみて下さい。
実行結果を確認することができます。もう一度同様の処理を行いたい場合はクローンを行って下さい。
gcloud dataproc jobs submit pyspark \ --cluster my-cluster gs://<YOUR-BUCKET-NAME>/unstructured/lab2.py
おわりに
今回はDataprocクラスタを活用したジョブの実行を行いました。データ分析処理は前処理がもっとも大変と言われています。大規模データの加工にはDataprocを活用し、ジョブを実行して行いたいと思います。
参考記事
Hadoop HDFS
https://www.graffe.jp/blog/1156
分散処理に入門してみた(Hadoop + Spark) | キャスレーコンサルティング株式会社