第5章:データ処理
更新日:2025年12月9日
1. NumPy
1.1 内部構造
NumPyはPythonにおける数値計算の基盤ライブラリである[1]。ndarrayは連続したメモリ領域にデータを格納し、C言語レベルの高速な演算を可能にする。
1.1.1 ndarrayの構造:ndarrayは、データバッファ、dtype(データ型)、shape(形状)、strides(ストライド)から構成される。
import numpy as np
arr = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.int64)
print(f"dtype: {arr.dtype}") # int64
print(f"shape: {arr.shape}") # (2, 3)
print(f"strides: {arr.strides}") # (24, 8) - 各次元で何バイト移動するか
print(f"itemsize: {arr.itemsize}") # 8 bytes per element
print(f"nbytes: {arr.nbytes}") # 48 bytes total
print(f"flags:\n{arr.flags}") # C_CONTIGUOUS: True など
1.1.2 メモリレイアウト:C順序(行優先)とFortran順序(列優先)がある。
# C順序(行優先)- デフォルト
c_arr = np.array([[1, 2, 3], [4, 5, 6]], order='C')
print(c_arr.strides) # (24, 8) - 行方向に24byte、列方向に8byte
# Fortran順序(列優先)
f_arr = np.array([[1, 2, 3], [4, 5, 6]], order='F')
print(f_arr.strides) # (8, 16) - 行方向に8byte、列方向に16byte
# 操作によるメモリ連続性の変化
transposed = c_arr.T
print(transposed.flags['C_CONTIGUOUS']) # False
print(transposed.flags['F_CONTIGUOUS']) # True
Fig. 1にndarrayのメモリ構造を示す。
1.2 ベクトル化
ベクトル化(Vectorization)は、ループを使わずに配列全体に対して演算を行う手法である。Pythonループと比較して10〜100倍の高速化が期待できる。
import numpy as np
import time
n = 1_000_000
a = np.random.rand(n)
b = np.random.rand(n)
# アンチパターン: Pythonループ
def python_loop(a, b):
result = np.zeros(len(a))
for i in range(len(a)):
result[i] = a[i] + b[i]
return result
# 推奨: ベクトル化演算
def vectorized(a, b):
return a + b
# 性能比較
start = time.time()
python_loop(a, b)
print(f"Python loop: {time.time() - start:.4f}s")
start = time.time()
vectorized(a, b)
print(f"Vectorized: {time.time() - start:.4f}s")
# 典型的な結果:
# Python loop: 0.3500s
# Vectorized: 0.0015s (約230倍高速)
1.2.1 ブロードキャスティング:形状の異なる配列間での演算を可能にする仕組み。
# ブロードキャスティングの例
arr = np.array([[1, 2, 3],
[4, 5, 6]]) # shape: (2, 3)
# スカラーとの演算
print(arr * 2) # 全要素に2を掛ける
# 1次元配列との演算
row = np.array([10, 20, 30]) # shape: (3,)
print(arr + row) # 各行にrowを加算
# [[11, 22, 33],
# [14, 25, 36]]
col = np.array([[100], [200]]) # shape: (2, 1)
print(arr + col) # 各列にcolを加算
# [[101, 102, 103],
# [204, 205, 206]]
1.2.2 ユニバーサル関数(ufunc):要素ごとの演算を高速に行う関数群。
# ufuncの例
arr = np.array([1, 4, 9, 16, 25])
print(np.sqrt(arr)) # [1. 2. 3. 4. 5.]
print(np.exp(arr)) # 指数関数
print(np.log(arr)) # 自然対数
print(np.sin(arr)) # 正弦関数
# 条件付き演算
print(np.where(arr > 10, arr, 0)) # [0 0 0 16 25]
# 集約関数
print(np.sum(arr)) # 55
print(np.mean(arr)) # 11.0
print(np.std(arr)) # 8.0
print(np.cumsum(arr)) # [ 1 5 14 30 55]
2. Pandas最適化
Pandasは表形式データを扱うデファクトスタンダードだが、大規模データでは最適化が必要になる[2]。
2.1 データ型の最適化:適切なデータ型を選択することでメモリ使用量を削減。
import pandas as pd
import numpy as np
# サンプルデータ
df = pd.DataFrame({
'id': range(1_000_000),
'category': np.random.choice(['A', 'B', 'C'], 1_000_000),
'value': np.random.rand(1_000_000) * 100,
'count': np.random.randint(0, 1000, 1_000_000)
})
print(f"最適化前: {df.memory_usage(deep=True).sum() / 1e6:.1f} MB")
# データ型の最適化
df_optimized = df.copy()
df_optimized['id'] = df_optimized['id'].astype('int32')
df_optimized['category'] = df_optimized['category'].astype('category')
df_optimized['value'] = df_optimized['value'].astype('float32')
df_optimized['count'] = df_optimized['count'].astype('int16')
print(f"最適化後: {df_optimized.memory_usage(deep=True).sum() / 1e6:.1f} MB")
# 典型的な結果:
# 最適化前: 72.0 MB
# 最適化後: 11.5 MB (約84%削減)
Table 1. Pandasデータ型とメモリ使用量
| データ型 | バイト数 | 用途 |
|---|---|---|
| int8 / uint8 | 1 | -128〜127 / 0〜255 |
| int16 / uint16 | 2 | -32768〜32767 / 0〜65535 |
| int32 / uint32 | 4 | 約±21億 / 0〜約42億 |
| float32 | 4 | 単精度浮動小数点 |
| category | 可変 | カテゴリカル(少数の値が繰り返す場合) |
2.2 チェーンメソッドとquery:可読性と性能を両立する書き方。
# メソッドチェーン
result = (
df
.query('value > 50')
.assign(doubled=lambda x: x['count'] * 2)
.groupby('category')
.agg({
'value': ['mean', 'std'],
'doubled': 'sum'
})
)
# queryメソッドは文字列評価だが内部的に最適化されている
# 変数を使う場合は@プレフィックス
threshold = 50
df.query('value > @threshold')
2.3 apply vs ベクトル化:applyは遅いため、可能な限りベクトル化演算を使用する。
import pandas as pd
import numpy as np
df = pd.DataFrame({'a': range(100000), 'b': range(100000)})
# 遅い: apply
%timeit df.apply(lambda row: row['a'] + row['b'], axis=1)
# 約 2.5秒
# 速い: ベクトル化
%timeit df['a'] + df['b']
# 約 0.5ミリ秒 (5000倍高速)
# 条件付き処理もベクトル化
# 遅い
%timeit df['a'].apply(lambda x: x * 2 if x > 50000 else x)
# 速い
%timeit np.where(df['a'] > 50000, df['a'] * 2, df['a'])
3. Polars
PolarsはRust製の高速データフレームライブラリである[3]。Pandasと比較して数倍〜数十倍の性能を発揮し、遅延評価によるクエリ最適化を行う。
3.1 基本操作:
import polars as pl
# データフレーム作成
df = pl.DataFrame({
'id': range(1_000_000),
'category': ['A', 'B', 'C'] * 333333 + ['A'],
'value': [i * 0.1 for i in range(1_000_000)]
})
# 基本操作
result = (
df
.filter(pl.col('value') > 50000)
.with_columns([
(pl.col('value') * 2).alias('doubled'),
pl.col('category').cast(pl.Categorical)
])
.group_by('category')
.agg([
pl.col('value').mean().alias('mean_value'),
pl.col('doubled').sum().alias('total_doubled'),
pl.count().alias('count')
])
.sort('mean_value', descending=True)
)
print(result)
3.2 遅延評価(Lazy API):クエリを最適化してから実行する。
# 遅延評価モード
lazy_df = pl.scan_csv('large_file.csv') # ファイルをスキャン
result = (
lazy_df
.filter(pl.col('value') > 100)
.group_by('category')
.agg(pl.col('value').mean())
.collect() # ここで実行
)
# クエリプランの確認
print(lazy_df.filter(pl.col('value') > 100).explain())
Table 2. Pandas vs Polars
| 観点 | Pandas | Polars |
|---|---|---|
| 実装言語 | Python/Cython | Rust |
| 評価方式 | 即時評価 | 遅延評価(Lazy)対応 |
| 並列処理 | 限定的 | 自動並列化 |
| メモリ効率 | 中程度 | 高い(Arrow形式) |
| エコシステム | 非常に豊富 | 成長中 |
| 学習コスト | 情報が豊富 | 新しいAPI習得が必要 |
3.3 Pandasとの相互変換:
# Pandas -> Polars
import pandas as pd
pandas_df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
polars_df = pl.from_pandas(pandas_df)
# Polars -> Pandas
back_to_pandas = polars_df.to_pandas()
# NumPy配列との連携
import numpy as np
arr = polars_df['a'].to_numpy()
4. 大規模データ処理
メモリに収まらない大規模データを処理するパターンを解説する。
4.1 チャンク処理:データを分割して逐次処理。
import pandas as pd
# CSVをチャンクで読み込み
chunk_size = 100_000
results = []
for chunk in pd.read_csv('huge_file.csv', chunksize=chunk_size):
# 各チャンクを処理
processed = chunk.groupby('category')['value'].sum()
results.append(processed)
# 結果を集約
final_result = pd.concat(results).groupby(level=0).sum()
# Polarsでのストリーミング処理
result = (
pl.scan_csv('huge_file.csv')
.group_by('category')
.agg(pl.col('value').sum())
.collect(streaming=True) # ストリーミングモード
)
4.2 Dask:Pandasを並列・分散処理に拡張[4]。
import dask.dataframe as dd
# 大規模CSVの読み込み(遅延評価)
ddf = dd.read_csv('huge_file_*.csv') # 複数ファイルも対応
# Pandas風のAPI
result = (
ddf
.groupby('category')
.agg({'value': 'mean'})
.compute() # ここで実行
)
# 並列処理の設定
from dask.distributed import Client
client = Client(n_workers=4) # 4ワーカーで並列処理
4.3 データフォーマット選択:Table 3に用途別の推奨フォーマットを示す。
Table 3. データフォーマットの比較
| フォーマット | 特徴 | 推奨用途 |
|---|---|---|
| CSV | 可読性、互換性 | データ交換、小規模データ |
| Parquet | 列指向、圧縮、高速 | 分析用途、大規模データ |
| Arrow/Feather | ゼロコピー、言語間共有 | プロセス間通信 |
| HDF5 | 階層構造、部分読み込み | 科学計算、時系列 |
| SQLite | クエリ、トランザクション | 構造化データ、更新頻繁 |
# Parquetの使用例
import pandas as pd
# 書き込み(圧縮込み)
df.to_parquet('data.parquet', compression='snappy')
# 読み込み(特定列のみ)
df = pd.read_parquet('data.parquet', columns=['id', 'value'])
# Polarsでの高速読み込み
df = pl.read_parquet('data.parquet')
# 複数Parquetファイルの読み込み
df = pl.scan_parquet('data/*.parquet').collect()
Fig. 2にデータ処理ライブラリの選択フローを示す。
5. データバリデーション
データの品質を保証するためのバリデーション手法を解説する。
5.1 Pydanticによるレコードバリデーション:第2章で紹介したPydanticは、データレコードのバリデーションに有効。
from pydantic import BaseModel, field_validator, ValidationError
from datetime import date
from typing import Literal
class SalesRecord(BaseModel):
id: int
date: date
category: Literal['A', 'B', 'C']
amount: float
quantity: int
@field_validator('amount')
@classmethod
def amount_must_be_positive(cls, v):
if v < 0:
raise ValueError('amount must be non-negative')
return v
@field_validator('quantity')
@classmethod
def quantity_must_be_positive(cls, v):
if v <= 0:
raise ValueError('quantity must be positive')
return v
# バリデーション
try:
record = SalesRecord(
id=1,
date='2024-01-15',
category='A',
amount=100.0,
quantity=5
)
except ValidationError as e:
print(e.json())
5.2 panderaによるDataFrameバリデーション:DataFrameスキーマを定義してバリデーション[5]。
import pandas as pd
import pandera as pa
from pandera import Column, Check, DataFrameSchema
# スキーマ定義
schema = DataFrameSchema({
'id': Column(int, Check.greater_than(0)),
'category': Column(str, Check.isin(['A', 'B', 'C'])),
'value': Column(float, Check.in_range(0, 1000)),
'date': Column(pa.DateTime),
})
# データ検証
df = pd.DataFrame({
'id': [1, 2, 3],
'category': ['A', 'B', 'C'],
'value': [10.5, 20.3, 30.1],
'date': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03'])
})
validated_df = schema.validate(df)
# デコレータとしても使用可能
@pa.check_input(schema)
def process_data(df: pd.DataFrame) -> pd.DataFrame:
return df.groupby('category')['value'].mean()
5.3 クラスベースのスキーマ定義:より型安全なアプローチ。
import pandera as pa
from pandera.typing import DataFrame, Series
class SalesSchema(pa.DataFrameModel):
id: Series[int] = pa.Field(gt=0)
category: Series[str] = pa.Field(isin=['A', 'B', 'C'])
value: Series[float] = pa.Field(ge=0, le=1000)
date: Series[pa.DateTime]
class Config:
strict = True # 定義外の列を許可しない
coerce = True # 型変換を試みる
# 型ヒントとしても機能
def process_sales(df: DataFrame[SalesSchema]) -> DataFrame[SalesSchema]:
return df[df['value'] > 100]
# バリデーション実行
validated = SalesSchema.validate(df)
References
[1] NumPy, "NumPy Documentation," numpy.org, 2024.
[2] pandas, "pandas documentation," pandas.pydata.org, 2024.
[3] Polars, "Polars User Guide," docs.pola.rs, 2024.
[4] Dask, "Dask Documentation," docs.dask.org, 2024.
[5] pandera, "pandera Documentation," pandera.readthedocs.io, 2024.
本コンテンツは2025年12月時点の情報に基づいて作成されている。各ライブラリのAPIは変更される可能性があるため、最新の公式ドキュメントを参照されたい。性能比較は環境により異なる。