PySparkのファイル読み込み方法 readメソッドとoptions【CSV, TSV, JSON, Excel, TXT, PDF】

PySparkでファイルからデータを読み取り、データフレームに格納する機会は多いので、そこの解説をします。

というか自分のための備忘録だったりしますが…。

CSV, TSV, JSON, Excel, テキストファイルを読み込むことは割とあるかと思います。

一応PDFファイルからも読み込むことがあるかもしれませんので、そちらも記載していますがあまり使う機会はないかと思います。笑

CSVファイル、TSVファイル

かなりの頻度で登場するのがCSVファイルやTSVファイルではないでしょうか。

上記ファイルの読み込みはSparkSessionのreadメソッドを使います。

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("sparkReadApp").getOrCreate()
file_path = "files/users.csv"
options = {
    "header" : "true",
    "encoding" : "UTF-8"
}
df = spark.read.format("csv").options(**options).load(file_path)
df.show()

以下、表示結果。

※中身は事前に生成したダミーデータになります。

+-----------+---------+---------------+------+---+----------+---------+--------+-------------------+------------+
|undefinedID|     NAME|           KANA|GENDER|AGE|     BIRTH|POST_CODE|      PW|         CREATED_AT|RESINGED_FLG|
+-----------+---------+---------------+------+---+----------+---------+--------+-------------------+------------+
|          1|江本 春江|  エモト ハルエ|    女| 33|1990/11/25| 045-8460|nqlN7JTE|2020/12/20 23:57:45|       False|
|          2|三角 信男|  ミスミ ノブオ|    男| 37| 1987/3/10| 344-5757|leQMQglu| 2012/10/16 0:43:59|        True|
|          3|木内 芳明|キウチ ヨシアキ|    男| 22|2001/10/18| 323-2640|jcZySr5c|  1993/8/25 9:25:22|        True|
|          4|斉藤 真琴|サイトウ マコト|    女| 54| 1969/9/18| 613-1299|etq7Bc1U|  1986/1/3 14:11:58|        True|
|          5|郡司 克美|  グンジ カツミ|    女| 36|  1987/7/1| 460-7192|YVkybPmB| 1986/4/14 22:04:16|        True|
|          6|竹島 花梨|タケシマ カリン|    女| 31| 1993/3/11| 941-5533|eCDDOfQO|1978/10/13 18:33:03|        True|
|          7|高城 大和|  タカギ ヤマト|    男| 45|  1978/9/2| 558-4101|PjYmbpBV| 1987/11/3 23:59:05|       False|
|          8|  大澤 理|オオサワ サトル|    男| 41| 1982/11/2| 119-0620|OC_CIlUf| 2007/9/26 17:23:22|        True|
|          9|八田 真結|    ハッタ マユ|    女| 56|  1968/4/1| 022-3795|QM8Fpgln|  2003/7/21 1:23:03|       False|
|         10|  丹羽 遥|  タンバ ハルカ|    女| 30|1993/11/24| 172-0053|h8g4Z1Zz|  2006/5/3 10:36:05|        True|

詳細は後述しますが、それぞれoptionを指定するか**kwargsなので、dictionaryで渡さずにそのまま引数を指定しても大丈夫です。

今回であれば以下の指定をしています。

  • ヘッダーあり
  • 文字コードはUTF-8
  • CSVファイル

ご存知だとは思いますが、CSVファイルはComma Separated Valuesの頭文字を取った名称で、TSVはカンマ(Comma )ではなくタブ(Tab)ということになります。

TSVファイルの読み込みはformatの指定はそのまま”csv”として、optionに区切り文字の指定を付け加えます。

逆にformatを”tsv”とするとエラーになります。

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("sparkReadApp").getOrCreate()
file_path = "files/user.tsv"
options = {
    "header" : "true",
    "encoding" : "UTF-8",
    "delimiter" : "\t" # タブ区切り
}
# format("csv")はそのまま
df = spark.read.format("csv").options(**options).load(file_path)
df.show()

簡単ですね。

JSONファイル

JSONファイルはデータを構造化して保存および交換するための軽量で使いやすいフォーマットです。

WEB開発では頻繁に登場するので、覚えておきたいのですがSparkで読み込む場合は結構癖があります。

例えば以下のようなJSONファイルを読み込んでみましょう。

{
  "name": "John",
  "age": 30,
  "isStudent": false,
  "courses": ["Math", "Science", "English"],
  "address": {
      "street": "123 Main St",
      "city": "Anytown"
  }
}
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("sparkReadApp").getOrCreate()
file_path = "files/user.json"
df = spark.read.option("multiline","true") \
        .json(file_path)

df.show()

以下、表示結果。

+--------------------+---+--------------------+---------+----+
|             address|age|             courses|isStudent|name|
+--------------------+---+--------------------+---------+----+
|{Anytown, 123 Mai...| 30|[Math, Science, E...|    false|John|
+--------------------+---+--------------------+---------+----+

ネストされているため、読み込んだ後にSparkのデータフレームを加工してあげる必要があります。

これについてはパターンによって指定方法がいくつかあり、長くなるので別記事でJSONファイルの読み込みの記事で説明します。

ちなみにmultilineを指定しないとエラーになります。(データが1つだけならエラーにならないとは思いますが)

TXTファイル

TXTファイルも一応載せておきますが、めちゃくちゃシンプルです。

※サンプルデータです。

江本 春江
斉藤 真琴
郡司 克美
丹羽 遥
三角 信男
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkReadApp").getOrCreate()
file_path = "files/user.txt"
df = spark.read.text(file_path)
df.show()
+---------+
|    value|
+---------+
|江本 春江|
|斉藤 真琴|
|郡司 克美|
|  丹羽 遥|
|三角 信男|
+---------+

今回optionは指定していませんが、デフォルトで以下のようになっています。

  • wholeText : false # -> 複数のファイルを1つの行として読み込むかどうか
  • lineSep : \n # -> 行の区切り文字の指定
  • encoding : UTF-8 # -> テキストファイルの文字コード

Excelファイル

意外とあるのがExcelファイルをデータマートに取り込むというパターン。

from pyspark.sql import SparkSession

# Sparkセッションの作成
spark = SparkSession.builder.appName("sparkReadApp").getOrCreate()

# Excelファイルのパスを指定
excel_file_path = "files/user.xlsx"

# Excelファイルを読み込む
df = spark.read.format("com.crealytics.spark.excel") \
    .option("dataAddress", "Sheet1") \ # 読み込みたいシートを指定します。
    .option("header", "true") \ # ヘッダー行がある場合は"true"を指定します。
    .load(excel_file_path)

# 読み込んだデータを表示
df.show()

以下、表示結果。

+-----------+---------------------+----+--------------+----+------+--------------------+------------+-------------+--------+---------------------------------+------------------------------+----------------+--------+------------+
|       氏名|     氏名(ひらがな)|年齢|      生年月日|性別|血液型|      メールアドレス|    電話番号| 携帯電話番号|郵便番号|                             住所|                        会社名|クレジットカード|有効期限|マイナンバー|
+-----------+---------------------+----+--------------+----+------+--------------------+------------+-------------+--------+---------------------------------+------------------------------+----------------+--------+------------+
|    原 康一|        はら こういち|  36|1987年08月29日|  男|     A|harakoichi@exampl...|0799-61-7563|080-6649-6335|658-3571|     兵庫県神戸市北区君影町3-4-10|                  株式会社大貴|4411380709528245|   04/25| 33266687414|
|    中田 遼|        なかた りょう|  30|1993年12月06日|  男|     A|ryounakata@exampl...|0495-90-0887|050-5661-1460|343-8632|            埼玉県川口市戸塚1-5-9|            株式会社フリーダム|5513902851619540|   02/27| 73744403542|
|    伊東 薫|        いとう かおる|  72|1952年01月06日|  女|     B|ito_kaoru@example...|0573-03-0810|090-1908-4284|502-6297|           岐阜県岐阜市高田3-4-19|                           NaN|3547016981049323|   11/28| 66745256300|
|佐久間 綾乃|        さくま あやの|  24|2000年02月26日|  女|     A|ayano_sakuma@exam...|03-1960-5294|080-4563-9097|152-5358|            東京都新宿区新宿1-5-6|              株式会社エーワン|5474585880140922|   06/28|269242167251|

PDFファイル

PDFファイルを読み込む機会はあまりないかもしれませんが、おまけとして書いておきます。

といっても、pdfplumberを使ってPDFファイルからリストで取得し、そのリストをSparkのデータフレームに格納しているだけです。笑

読み込むのは以下のPDFデータになります。

!pip install pdfplumber
import pdfplumber
from pyspark.sql import SparkSession

# Sparkセッションの作成
spark = SparkSession.builder \
    .appName("PDF_Example") \
    .getOrCreate()

# PDFファイルのパスを指定
pdf_file_path = "files/dummy_data.pdf"

# PDFファイルからデータを抽出
data = []
with pdfplumber.open(pdf_file_path) as pdf:
    for page in pdf.pages:
        # テキストを抽出
        text = page.extract_text()
        # テキストを改行で分割してリストに追加
        lines = text.split('\n')
        data.extend(lines)
# DataFrameの作成
df = spark.createDataFrame(data, "string")

# DataFrameの表示
df.show()

以下、表示結果。

+------------------+
|             value|
+------------------+
|   ダミーPDFデータ|
|以下サンプルデータ|
|      ① あいうえお|
|      ② かきくけこ|
|      ③ さしすせそ|
+------------------+

きちんと表になっているPDFファイルを使いたかったんですが、ささっと作成してしまったため簡単な内容になってしまいました。

optionについて

最後にoptionについての説明です。

optionの指定方法はいくつかあるので、まずはそれぞれ実装例を紹介します。

CSVファイルを例に説明しますが、以下の3つの方法があります。

  1. optionsにdictionaryで指定する方法
  2. optionsにキーワード引数で指定する方法
  3. optionで一つずつ指定する方法
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("sparkReadApp").getOrCreate()
file_path = "files/users.csv"

# ① -> dictionaryを設定して、引数に渡す方法
options = {
    "header" : "true",
    "encoding" : "UTF-8"
}
df = spark.read.format("csv").options(**options).load(file_path)

# ② -> optionsにキーワード引数を指定する方法
df = spark.read.format("csv").options(
header="true", encoding="UTF-8").load(file_path)

# ③ -> optionで一つずつ指定する方法
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("encoding", "UTF-8") \
    .load(file_path)

個人的によく見かけるのは③の記述方法ですが、私としては①の書き方が好きです。

なんとなく見やすいのというのが理由ですが、どれを使うかは現場や周りに合わせるのがよいかと思います。

optionの種類

optionで指定できる種類をざっと説明しますが、正直このへんは公式見た方が良いと思います。(https://spark.apache.org/docs/3.5.1/sql-data-sources-csv.html

バージョンによっては使えなかったり、新たに追加されたり、実装方法が変更になったりするからです。

といってもファイルの読み込み時に、よく使うoptionだけでも紹介しておきます。

プロパティ名デフォルト値説明
encodingUTF-8CSVファイルの読み込み時に指定されたエンコーディングでファイルをデコードします。日本語などのマルチバイト文字を含むファイルを扱う場合、適切なエンコーディングを指定することが重要です。
sep,各フィールドと値の間に使うセパレーターを設定します。カンマ(,)以外の文字をセパレーターとして指定したい場合に使用します。
headerfalse読み込み時にファイルの最初の行をヘッダー(列名)として扱うかどうかを指定します。ヘッダーがある場合はtrueに設定します。
inferSchemafalseデータからスキーマ(列のデータ型)を自動的に推測するかどうかを指定します。推測を行うとデータの読み込みに余分な処理が発生します。また独自にスキーマを指定することも可能です。
nullValuenullNULL値として扱う文字列を設定します。データに含まれる特定の文字列をNULLとして扱いたい場合に使用します。
quote引用符を設定します。データ内でセパレーターをエスケープするために使用される文字です。値の中にセパレーターが含まれる場合は、引用符で囲むことでその値をエスケープします。
escape\引用符やセパレーターをエスケープするために使用される文字を設定します。引用符の中に引用符を含む場合、エスケープ文字で引用符をエスケープします。
dateFormatyyyy-MM-dd日付形式を指定します。異なるフォーマットの日付データを適切に読み込むために、ファイル内の日付形式に合わせて指定します。
compressionnone圧縮されたCSVファイルを読み込む場合、使用されている圧縮コーデックを指定します。大容量データを扱う場合は結構使うかも。
ignoreTrailingWhiteSpacefalse読み込み時に各フィールドの末尾にある余分な空白を無視するかどうかを指定します。フィールド内に余分な空白がある場合、データのクリーンアップに役立ちます。

だいたいこのへん使えればとくに困らないかなーと思ってます。