PySparkでファイルからデータを読み取り、データフレームに格納する機会は多いので、そこの解説をします。
というか自分のための備忘録だったりしますが…。
CSV, TSV, JSON, Excel, テキストファイルを読み込むことは割とあるかと思います。
一応PDFファイルからも読み込むことがあるかもしれませんので、そちらも記載していますがあまり使う機会はないかと思います。笑
CSVファイル、TSVファイル
かなりの頻度で登場するのがCSVファイルやTSVファイルではないでしょうか。
上記ファイルの読み込みはSparkSessionのreadメソッドを使います。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkReadApp").getOrCreate()
file_path = "files/users.csv"
options = {
"header" : "true",
"encoding" : "UTF-8"
}
df = spark.read.format("csv").options(**options).load(file_path)
df.show()
以下、表示結果。
※中身は事前に生成したダミーデータになります。
+-----------+---------+---------------+------+---+----------+---------+--------+-------------------+------------+
|undefinedID| NAME| KANA|GENDER|AGE| BIRTH|POST_CODE| PW| CREATED_AT|RESINGED_FLG|
+-----------+---------+---------------+------+---+----------+---------+--------+-------------------+------------+
| 1|江本 春江| エモト ハルエ| 女| 33|1990/11/25| 045-8460|nqlN7JTE|2020/12/20 23:57:45| False|
| 2|三角 信男| ミスミ ノブオ| 男| 37| 1987/3/10| 344-5757|leQMQglu| 2012/10/16 0:43:59| True|
| 3|木内 芳明|キウチ ヨシアキ| 男| 22|2001/10/18| 323-2640|jcZySr5c| 1993/8/25 9:25:22| True|
| 4|斉藤 真琴|サイトウ マコト| 女| 54| 1969/9/18| 613-1299|etq7Bc1U| 1986/1/3 14:11:58| True|
| 5|郡司 克美| グンジ カツミ| 女| 36| 1987/7/1| 460-7192|YVkybPmB| 1986/4/14 22:04:16| True|
| 6|竹島 花梨|タケシマ カリン| 女| 31| 1993/3/11| 941-5533|eCDDOfQO|1978/10/13 18:33:03| True|
| 7|高城 大和| タカギ ヤマト| 男| 45| 1978/9/2| 558-4101|PjYmbpBV| 1987/11/3 23:59:05| False|
| 8| 大澤 理|オオサワ サトル| 男| 41| 1982/11/2| 119-0620|OC_CIlUf| 2007/9/26 17:23:22| True|
| 9|八田 真結| ハッタ マユ| 女| 56| 1968/4/1| 022-3795|QM8Fpgln| 2003/7/21 1:23:03| False|
| 10| 丹羽 遥| タンバ ハルカ| 女| 30|1993/11/24| 172-0053|h8g4Z1Zz| 2006/5/3 10:36:05| True|
詳細は後述しますが、それぞれoptionを指定するか**kwargsなので、dictionaryで渡さずにそのまま引数を指定しても大丈夫です。
今回であれば以下の指定をしています。
- ヘッダーあり
- 文字コードはUTF-8
- CSVファイル
ご存知だとは思いますが、CSVファイルはComma Separated Valuesの頭文字を取った名称で、TSVはカンマ(Comma )ではなくタブ(Tab)ということになります。
TSVファイルの読み込みはformatの指定はそのまま”csv”として、optionに区切り文字の指定を付け加えます。
逆にformatを”tsv”とするとエラーになります。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkReadApp").getOrCreate()
file_path = "files/user.tsv"
options = {
"header" : "true",
"encoding" : "UTF-8",
"delimiter" : "\t" # タブ区切り
}
# format("csv")はそのまま
df = spark.read.format("csv").options(**options).load(file_path)
df.show()
簡単ですね。
JSONファイル
JSONファイルはデータを構造化して保存および交換するための軽量で使いやすいフォーマットです。
WEB開発では頻繁に登場するので、覚えておきたいのですがSparkで読み込む場合は結構癖があります。
例えば以下のようなJSONファイルを読み込んでみましょう。
{
"name": "John",
"age": 30,
"isStudent": false,
"courses": ["Math", "Science", "English"],
"address": {
"street": "123 Main St",
"city": "Anytown"
}
}
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkReadApp").getOrCreate()
file_path = "files/user.json"
df = spark.read.option("multiline","true") \
.json(file_path)
df.show()
以下、表示結果。
+--------------------+---+--------------------+---------+----+
| address|age| courses|isStudent|name|
+--------------------+---+--------------------+---------+----+
|{Anytown, 123 Mai...| 30|[Math, Science, E...| false|John|
+--------------------+---+--------------------+---------+----+
ネストされているため、読み込んだ後にSparkのデータフレームを加工してあげる必要があります。
これについてはパターンによって指定方法がいくつかあり、長くなるので別記事でJSONファイルの読み込みの記事で説明します。
ちなみにmultilineを指定しないとエラーになります。(データが1つだけならエラーにならないとは思いますが)
TXTファイル
TXTファイルも一応載せておきますが、めちゃくちゃシンプルです。
※サンプルデータです。
江本 春江
斉藤 真琴
郡司 克美
丹羽 遥
三角 信男
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkReadApp").getOrCreate()
file_path = "files/user.txt"
df = spark.read.text(file_path)
df.show()
+---------+
| value|
+---------+
|江本 春江|
|斉藤 真琴|
|郡司 克美|
| 丹羽 遥|
|三角 信男|
+---------+
今回optionは指定していませんが、デフォルトで以下のようになっています。
wholeText
: false
# -> 複数のファイルを1つの行として読み込むかどうかlineSep
: \n # -> 行の区切り文字の指定encoding
: UTF-8 # -> テキストファイルの文字コード
Excelファイル
意外とあるのがExcelファイルをデータマートに取り込むというパターン。
from pyspark.sql import SparkSession
# Sparkセッションの作成
spark = SparkSession.builder.appName("sparkReadApp").getOrCreate()
# Excelファイルのパスを指定
excel_file_path = "files/user.xlsx"
# Excelファイルを読み込む
df = spark.read.format("com.crealytics.spark.excel") \
.option("dataAddress", "Sheet1") \ # 読み込みたいシートを指定します。
.option("header", "true") \ # ヘッダー行がある場合は"true"を指定します。
.load(excel_file_path)
# 読み込んだデータを表示
df.show()
以下、表示結果。
+-----------+---------------------+----+--------------+----+------+--------------------+------------+-------------+--------+---------------------------------+------------------------------+----------------+--------+------------+
| 氏名| 氏名(ひらがな)|年齢| 生年月日|性別|血液型| メールアドレス| 電話番号| 携帯電話番号|郵便番号| 住所| 会社名|クレジットカード|有効期限|マイナンバー|
+-----------+---------------------+----+--------------+----+------+--------------------+------------+-------------+--------+---------------------------------+------------------------------+----------------+--------+------------+
| 原 康一| はら こういち| 36|1987年08月29日| 男| A|harakoichi@exampl...|0799-61-7563|080-6649-6335|658-3571| 兵庫県神戸市北区君影町3-4-10| 株式会社大貴|4411380709528245| 04/25| 33266687414|
| 中田 遼| なかた りょう| 30|1993年12月06日| 男| A|ryounakata@exampl...|0495-90-0887|050-5661-1460|343-8632| 埼玉県川口市戸塚1-5-9| 株式会社フリーダム|5513902851619540| 02/27| 73744403542|
| 伊東 薫| いとう かおる| 72|1952年01月06日| 女| B|ito_kaoru@example...|0573-03-0810|090-1908-4284|502-6297| 岐阜県岐阜市高田3-4-19| NaN|3547016981049323| 11/28| 66745256300|
|佐久間 綾乃| さくま あやの| 24|2000年02月26日| 女| A|ayano_sakuma@exam...|03-1960-5294|080-4563-9097|152-5358| 東京都新宿区新宿1-5-6| 株式会社エーワン|5474585880140922| 06/28|269242167251|
PDFファイル
PDFファイルを読み込む機会はあまりないかもしれませんが、おまけとして書いておきます。
といっても、pdfplumberを使ってPDFファイルからリストで取得し、そのリストをSparkのデータフレームに格納しているだけです。笑
読み込むのは以下のPDFデータになります。
!pip install pdfplumber
import pdfplumber
from pyspark.sql import SparkSession
# Sparkセッションの作成
spark = SparkSession.builder \
.appName("PDF_Example") \
.getOrCreate()
# PDFファイルのパスを指定
pdf_file_path = "files/dummy_data.pdf"
# PDFファイルからデータを抽出
data = []
with pdfplumber.open(pdf_file_path) as pdf:
for page in pdf.pages:
# テキストを抽出
text = page.extract_text()
# テキストを改行で分割してリストに追加
lines = text.split('\n')
data.extend(lines)
# DataFrameの作成
df = spark.createDataFrame(data, "string")
# DataFrameの表示
df.show()
以下、表示結果。
+------------------+
| value|
+------------------+
| ダミーPDFデータ|
|以下サンプルデータ|
| ① あいうえお|
| ② かきくけこ|
| ③ さしすせそ|
+------------------+
きちんと表になっているPDFファイルを使いたかったんですが、ささっと作成してしまったため簡単な内容になってしまいました。
optionについて
最後にoptionについての説明です。
optionの指定方法はいくつかあるので、まずはそれぞれ実装例を紹介します。
CSVファイルを例に説明しますが、以下の3つの方法があります。
- optionsにdictionaryで指定する方法
- optionsにキーワード引数で指定する方法
- optionで一つずつ指定する方法
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sparkReadApp").getOrCreate()
file_path = "files/users.csv"
# ① -> dictionaryを設定して、引数に渡す方法
options = {
"header" : "true",
"encoding" : "UTF-8"
}
df = spark.read.format("csv").options(**options).load(file_path)
# ② -> optionsにキーワード引数を指定する方法
df = spark.read.format("csv").options(
header="true", encoding="UTF-8").load(file_path)
# ③ -> optionで一つずつ指定する方法
df = spark.read.format("csv") \
.option("header", "true") \
.option("encoding", "UTF-8") \
.load(file_path)
個人的によく見かけるのは③の記述方法ですが、私としては①の書き方が好きです。
なんとなく見やすいのというのが理由ですが、どれを使うかは現場や周りに合わせるのがよいかと思います。
optionの種類
optionで指定できる種類をざっと説明しますが、正直このへんは公式見た方が良いと思います。(https://spark.apache.org/docs/3.5.1/sql-data-sources-csv.html)
バージョンによっては使えなかったり、新たに追加されたり、実装方法が変更になったりするからです。
といってもファイルの読み込み時に、よく使うoptionだけでも紹介しておきます。
プロパティ名 | デフォルト値 | 説明 |
encoding | UTF-8 | CSVファイルの読み込み時に指定されたエンコーディングでファイルをデコードします。日本語などのマルチバイト文字を含むファイルを扱う場合、適切なエンコーディングを指定することが重要です。 |
sep | , | 各フィールドと値の間に使うセパレーターを設定します。カンマ(, )以外の文字をセパレーターとして指定したい場合に使用します。 |
header | false | 読み込み時にファイルの最初の行をヘッダー(列名)として扱うかどうかを指定します。ヘッダーがある場合はtrue に設定します。 |
inferSchema | false | データからスキーマ(列のデータ型)を自動的に推測するかどうかを指定します。推測を行うとデータの読み込みに余分な処理が発生します。また独自にスキーマを指定することも可能です。 |
nullValue | null | NULL値として扱う文字列を設定します。データに含まれる特定の文字列をNULLとして扱いたい場合に使用します。 |
quote | “ | 引用符を設定します。データ内でセパレーターをエスケープするために使用される文字です。値の中にセパレーターが含まれる場合は、引用符で囲むことでその値をエスケープします。 |
escape | \ | 引用符やセパレーターをエスケープするために使用される文字を設定します。引用符の中に引用符を含む場合、エスケープ文字で引用符をエスケープします。 |
dateFormat | yyyy-MM-dd | 日付形式を指定します。異なるフォーマットの日付データを適切に読み込むために、ファイル内の日付形式に合わせて指定します。 |
compression | none | 圧縮されたCSVファイルを読み込む場合、使用されている圧縮コーデックを指定します。大容量データを扱う場合は結構使うかも。 |
ignoreTrailingWhiteSpace | false | 読み込み時に各フィールドの末尾にある余分な空白を無視するかどうかを指定します。フィールド内に余分な空白がある場合、データのクリーンアップに役立ちます。 |
だいたいこのへん使えればとくに困らないかなーと思ってます。