この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。
はじめに
データアナリティクス事業本部ビッグデータチームのyosh-kです。
今回は、M1 MacでPySpark環境を構築し簡単なCSV読み込みとDataFrameのShowコマンドで出力結果を確認するところまでをまとめていきたいと思います。
環境
- macOS Monterey バージョン12.4
- M1チップ
- Python 3.10.4
- zsh
Apache Sparkとは
前提としてApache SparkとPysparkの関係性について簡単に理解します。
Apache Sparkとは複数台のノードで分散処理を行い、ビッグデータを分析するためのフレームワークです。
PySparkとは
SparkはScalaで実装されていますが、それをPythonで実行できるようにSparkはPython用のAPIを提供しています。
このAPIのことをPySparkといいます。
Pyspark環境構築
Javaインストール
PySparkの裏側ではJavaが動いているのでJava11をインストールします。
brew install java11
出力結果に表示されるように環境変数の設定が必要なので設定を行います。
If you need to have openjdk@11 first in your PATH, run:
echo 'export PATH="/opt/homebrew/opt/openjdk@11/bin:$PATH"' >> ~/.zshrc
For compilers to find openjdk@11 you may need to set:
export CPPFLAGS="-I/opt/homebrew/opt/openjdk@11/include"
echo 'export PATH="/opt/homebrew/opt/openjdk@11/bin:$PATH"' >> ~/.zshrc
export CPPFLAGS="-I/opt/homebrew/opt/openjdk@11/include"
設定を反映。
source ~/.zshrc
Sparkのインストール
以下のコマンドでSparkをインストール、環境設定を行います。
brew install apache-spark
echo "export SPARK_HOME=/opt/homebrew/opt/apache-spark/libexec/" >> ~/.zshrc
echo "export PATH=${PATH}:${SPARK_HOME}/" >> ~/.zshrc
設定を反映。
source ~/.zshrc
ログ設定
Sparkのイベントログを格納するためのディレクトリを作成します。
mkdir /tmp/spark-events
Pysparkの実行
findsparkライブラリのインストール
PySparkを起動しなくても、SPARK_HOMEの環境変数を読み込んで、すぐにSparkを使えるようにしてくれるライブラリをインストールします。
pip install findspark
以下のソースコードを実行し動作確認します。
import findspark
#.init()で$SPARK_HOMEのパスを自動的に読み込む
findspark.init()
#pysparkに必要なライブラリを読み込む
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
#spark sessionの作成
spark = SparkSession.builder \
.appName("test") \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.config("spark.sql.session.timeZone", "JST") \
.config("spark.ui.enabled","true") \
.config("spark.eventLog.enabled","true") \
.enableHiveSupport() \
.getOrCreate()
spark.sql("show tables").show()
spark.stop()
spark.sparkContext.stop()
show tableの実行結果が空で表示されればOKです!
(blog_env) @ 03 % python pyspark_test.py
Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition.mode
Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition
22/07/03 17:47:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/07/03 17:47:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/07/03 17:47:31 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
22/07/03 17:47:31 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
22/07/03 17:47:33 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
22/07/03 17:47:33 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
22/07/03 17:47:33 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+
最後にテスト用のCSVファイルを作成し、CSV読み込みとDataFrameのshowコマンドで出力できるか確認します。
import findspark
#.init()で$SPARK_HOMEのパスを自動的に読み込む
findspark.init()
#pysparkに必要なライブラリを読み込む
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
#spark sessionの作成
spark = SparkSession.builder \
.appName("test") \
.config("hive.exec.dynamic.partition", "true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.config("spark.sql.session.timeZone", "JST") \
.config("spark.ui.enabled","true") \
.config("spark.eventLog.enabled","true") \
.enableHiveSupport() \
.getOrCreate()
struct = StructType([
StructField("test_column_A", StringType(), False),
StructField("test_column_B", StringType(), False),
])
# csvデータ読み込み
df_csv = spark.read.option("multiline", "true").option("encoding", "UTF-8") \
.csv("test.csv", header=False, sep=',', inferSchema=False, schema=struct)
df_csv.show()
spark.stop()
spark.sparkContext.stop()
テストの用のCSVファイルが以下のように読み込めていればOKです!
(blog_env)03 % python pyspark_test.py
Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition.mode
Warning: Ignoring non-Spark config property: hive.exec.dynamic.partition
22/07/03 20:13:20 WARN Utils: Your hostname,local resolves to a loopback address: 127.0.0.1; instead (on interface en0)
22/07/03 20:13:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/07/03 20:13:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+-------------+-------------+
|test_column_A|test_column_B|
+-------------+-------------+
| test1| test2|
| test3| test4|
+-------------+-------------+
最後に
PySparkのローカル環境構築はできたものの、そもそものApache Spark、分散処理などの基礎的なことやDataFrameの実装方法など理解していないことはたくさんあるので、一つずつまとめていきたいと思います。
私のように初めて環境構築する人にとって少しでも助けとなれば幸いです。
参考文献
PySparkとは?
PySpark Documentation
Apache Sparkに手を出してヤケドしないための基本