第5章:データ処理

更新日:2025年12月9日

本章では、Pythonにおけるデータ処理の基盤技術を解説する。NumPyの内部構造とベクトル化演算、Pandasの最適化手法、新世代のPolarsによる高速処理、大規模データ処理のパターン、Pydantic/panderaによるデータバリデーションについて学ぶ。適切なライブラリ選択と最適化により、数百倍の性能差が生じることもある。

1. NumPy

1.1 内部構造

NumPyはPythonにおける数値計算の基盤ライブラリである[1]。ndarrayは連続したメモリ領域にデータを格納し、C言語レベルの高速な演算を可能にする。

1.1.1 ndarrayの構造:ndarrayは、データバッファ、dtype(データ型)、shape(形状)、strides(ストライド)から構成される。

import numpy as np

arr = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.int64)

print(f"dtype: {arr.dtype}")        # int64
print(f"shape: {arr.shape}")        # (2, 3)
print(f"strides: {arr.strides}")    # (24, 8) - 各次元で何バイト移動するか
print(f"itemsize: {arr.itemsize}")  # 8 bytes per element
print(f"nbytes: {arr.nbytes}")      # 48 bytes total
print(f"flags:\n{arr.flags}")       # C_CONTIGUOUS: True など

1.1.2 メモリレイアウト:C順序(行優先)とFortran順序(列優先)がある。

# C順序(行優先)- デフォルト
c_arr = np.array([[1, 2, 3], [4, 5, 6]], order='C')
print(c_arr.strides)  # (24, 8) - 行方向に24byte、列方向に8byte

# Fortran順序(列優先)
f_arr = np.array([[1, 2, 3], [4, 5, 6]], order='F')
print(f_arr.strides)  # (8, 16) - 行方向に8byte、列方向に16byte

# 操作によるメモリ連続性の変化
transposed = c_arr.T
print(transposed.flags['C_CONTIGUOUS'])  # False
print(transposed.flags['F_CONTIGUOUS'])  # True

Fig. 1にndarrayのメモリ構造を示す。

1.2 ベクトル化

ベクトル化(Vectorization)は、ループを使わずに配列全体に対して演算を行う手法である。Pythonループと比較して10〜100倍の高速化が期待できる。

import numpy as np
import time

n = 1_000_000
a = np.random.rand(n)
b = np.random.rand(n)

# アンチパターン: Pythonループ
def python_loop(a, b):
    result = np.zeros(len(a))
    for i in range(len(a)):
        result[i] = a[i] + b[i]
    return result

# 推奨: ベクトル化演算
def vectorized(a, b):
    return a + b

# 性能比較
start = time.time()
python_loop(a, b)
print(f"Python loop: {time.time() - start:.4f}s")

start = time.time()
vectorized(a, b)
print(f"Vectorized: {time.time() - start:.4f}s")

# 典型的な結果:
# Python loop: 0.3500s
# Vectorized: 0.0015s (約230倍高速)

1.2.1 ブロードキャスティング:形状の異なる配列間での演算を可能にする仕組み。

# ブロードキャスティングの例
arr = np.array([[1, 2, 3],
                [4, 5, 6]])  # shape: (2, 3)

# スカラーとの演算
print(arr * 2)  # 全要素に2を掛ける

# 1次元配列との演算
row = np.array([10, 20, 30])  # shape: (3,)
print(arr + row)  # 各行にrowを加算
# [[11, 22, 33],
#  [14, 25, 36]]

col = np.array([[100], [200]])  # shape: (2, 1)
print(arr + col)  # 各列にcolを加算
# [[101, 102, 103],
#  [204, 205, 206]]

1.2.2 ユニバーサル関数(ufunc):要素ごとの演算を高速に行う関数群。

# ufuncの例
arr = np.array([1, 4, 9, 16, 25])

print(np.sqrt(arr))      # [1. 2. 3. 4. 5.]
print(np.exp(arr))       # 指数関数
print(np.log(arr))       # 自然対数
print(np.sin(arr))       # 正弦関数

# 条件付き演算
print(np.where(arr > 10, arr, 0))  # [0 0 0 16 25]

# 集約関数
print(np.sum(arr))       # 55
print(np.mean(arr))      # 11.0
print(np.std(arr))       # 8.0
print(np.cumsum(arr))    # [ 1  5 14 30 55]

2. Pandas最適化

Pandasは表形式データを扱うデファクトスタンダードだが、大規模データでは最適化が必要になる[2]。

2.1 データ型の最適化:適切なデータ型を選択することでメモリ使用量を削減。

import pandas as pd
import numpy as np

# サンプルデータ
df = pd.DataFrame({
    'id': range(1_000_000),
    'category': np.random.choice(['A', 'B', 'C'], 1_000_000),
    'value': np.random.rand(1_000_000) * 100,
    'count': np.random.randint(0, 1000, 1_000_000)
})

print(f"最適化前: {df.memory_usage(deep=True).sum() / 1e6:.1f} MB")

# データ型の最適化
df_optimized = df.copy()
df_optimized['id'] = df_optimized['id'].astype('int32')
df_optimized['category'] = df_optimized['category'].astype('category')
df_optimized['value'] = df_optimized['value'].astype('float32')
df_optimized['count'] = df_optimized['count'].astype('int16')

print(f"最適化後: {df_optimized.memory_usage(deep=True).sum() / 1e6:.1f} MB")

# 典型的な結果:
# 最適化前: 72.0 MB
# 最適化後: 11.5 MB (約84%削減)

Table 1. Pandasデータ型とメモリ使用量

データ型 バイト数 用途
int8 / uint8 1 -128〜127 / 0〜255
int16 / uint16 2 -32768〜32767 / 0〜65535
int32 / uint32 4 約±21億 / 0〜約42億
float32 4 単精度浮動小数点
category 可変 カテゴリカル(少数の値が繰り返す場合)

2.2 チェーンメソッドとquery:可読性と性能を両立する書き方。

# メソッドチェーン
result = (
    df
    .query('value > 50')
    .assign(doubled=lambda x: x['count'] * 2)
    .groupby('category')
    .agg({
        'value': ['mean', 'std'],
        'doubled': 'sum'
    })
)

# queryメソッドは文字列評価だが内部的に最適化されている
# 変数を使う場合は@プレフィックス
threshold = 50
df.query('value > @threshold')

2.3 apply vs ベクトル化:applyは遅いため、可能な限りベクトル化演算を使用する。

import pandas as pd
import numpy as np

df = pd.DataFrame({'a': range(100000), 'b': range(100000)})

# 遅い: apply
%timeit df.apply(lambda row: row['a'] + row['b'], axis=1)
# 約 2.5秒

# 速い: ベクトル化
%timeit df['a'] + df['b']
# 約 0.5ミリ秒 (5000倍高速)

# 条件付き処理もベクトル化
# 遅い
%timeit df['a'].apply(lambda x: x * 2 if x > 50000 else x)

# 速い
%timeit np.where(df['a'] > 50000, df['a'] * 2, df['a'])

3. Polars

PolarsはRust製の高速データフレームライブラリである[3]。Pandasと比較して数倍〜数十倍の性能を発揮し、遅延評価によるクエリ最適化を行う。

3.1 基本操作

import polars as pl

# データフレーム作成
df = pl.DataFrame({
    'id': range(1_000_000),
    'category': ['A', 'B', 'C'] * 333333 + ['A'],
    'value': [i * 0.1 for i in range(1_000_000)]
})

# 基本操作
result = (
    df
    .filter(pl.col('value') > 50000)
    .with_columns([
        (pl.col('value') * 2).alias('doubled'),
        pl.col('category').cast(pl.Categorical)
    ])
    .group_by('category')
    .agg([
        pl.col('value').mean().alias('mean_value'),
        pl.col('doubled').sum().alias('total_doubled'),
        pl.count().alias('count')
    ])
    .sort('mean_value', descending=True)
)

print(result)

3.2 遅延評価(Lazy API):クエリを最適化してから実行する。

# 遅延評価モード
lazy_df = pl.scan_csv('large_file.csv')  # ファイルをスキャン

result = (
    lazy_df
    .filter(pl.col('value') > 100)
    .group_by('category')
    .agg(pl.col('value').mean())
    .collect()  # ここで実行
)

# クエリプランの確認
print(lazy_df.filter(pl.col('value') > 100).explain())

Table 2. Pandas vs Polars

観点 Pandas Polars
実装言語 Python/Cython Rust
評価方式 即時評価 遅延評価(Lazy)対応
並列処理 限定的 自動並列化
メモリ効率 中程度 高い(Arrow形式)
エコシステム 非常に豊富 成長中
学習コスト 情報が豊富 新しいAPI習得が必要

3.3 Pandasとの相互変換

# Pandas -> Polars
import pandas as pd
pandas_df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
polars_df = pl.from_pandas(pandas_df)

# Polars -> Pandas
back_to_pandas = polars_df.to_pandas()

# NumPy配列との連携
import numpy as np
arr = polars_df['a'].to_numpy()

4. 大規模データ処理

メモリに収まらない大規模データを処理するパターンを解説する。

4.1 チャンク処理:データを分割して逐次処理。

import pandas as pd

# CSVをチャンクで読み込み
chunk_size = 100_000
results = []

for chunk in pd.read_csv('huge_file.csv', chunksize=chunk_size):
    # 各チャンクを処理
    processed = chunk.groupby('category')['value'].sum()
    results.append(processed)

# 結果を集約
final_result = pd.concat(results).groupby(level=0).sum()

# Polarsでのストリーミング処理
result = (
    pl.scan_csv('huge_file.csv')
    .group_by('category')
    .agg(pl.col('value').sum())
    .collect(streaming=True)  # ストリーミングモード
)

4.2 Dask:Pandasを並列・分散処理に拡張[4]。

import dask.dataframe as dd

# 大規模CSVの読み込み(遅延評価)
ddf = dd.read_csv('huge_file_*.csv')  # 複数ファイルも対応

# Pandas風のAPI
result = (
    ddf
    .groupby('category')
    .agg({'value': 'mean'})
    .compute()  # ここで実行
)

# 並列処理の設定
from dask.distributed import Client
client = Client(n_workers=4)  # 4ワーカーで並列処理

4.3 データフォーマット選択:Table 3に用途別の推奨フォーマットを示す。

Table 3. データフォーマットの比較

フォーマット 特徴 推奨用途
CSV 可読性、互換性 データ交換、小規模データ
Parquet 列指向、圧縮、高速 分析用途、大規模データ
Arrow/Feather ゼロコピー、言語間共有 プロセス間通信
HDF5 階層構造、部分読み込み 科学計算、時系列
SQLite クエリ、トランザクション 構造化データ、更新頻繁
# Parquetの使用例
import pandas as pd

# 書き込み(圧縮込み)
df.to_parquet('data.parquet', compression='snappy')

# 読み込み(特定列のみ)
df = pd.read_parquet('data.parquet', columns=['id', 'value'])

# Polarsでの高速読み込み
df = pl.read_parquet('data.parquet')

# 複数Parquetファイルの読み込み
df = pl.scan_parquet('data/*.parquet').collect()

Fig. 2にデータ処理ライブラリの選択フローを示す。

5. データバリデーション

データの品質を保証するためのバリデーション手法を解説する。

5.1 Pydanticによるレコードバリデーション:第2章で紹介したPydanticは、データレコードのバリデーションに有効。

from pydantic import BaseModel, field_validator, ValidationError
from datetime import date
from typing import Literal

class SalesRecord(BaseModel):
    id: int
    date: date
    category: Literal['A', 'B', 'C']
    amount: float
    quantity: int

    @field_validator('amount')
    @classmethod
    def amount_must_be_positive(cls, v):
        if v < 0:
            raise ValueError('amount must be non-negative')
        return v

    @field_validator('quantity')
    @classmethod
    def quantity_must_be_positive(cls, v):
        if v <= 0:
            raise ValueError('quantity must be positive')
        return v

# バリデーション
try:
    record = SalesRecord(
        id=1,
        date='2024-01-15',
        category='A',
        amount=100.0,
        quantity=5
    )
except ValidationError as e:
    print(e.json())

5.2 panderaによるDataFrameバリデーション:DataFrameスキーマを定義してバリデーション[5]。

import pandas as pd
import pandera as pa
from pandera import Column, Check, DataFrameSchema

# スキーマ定義
schema = DataFrameSchema({
    'id': Column(int, Check.greater_than(0)),
    'category': Column(str, Check.isin(['A', 'B', 'C'])),
    'value': Column(float, Check.in_range(0, 1000)),
    'date': Column(pa.DateTime),
})

# データ検証
df = pd.DataFrame({
    'id': [1, 2, 3],
    'category': ['A', 'B', 'C'],
    'value': [10.5, 20.3, 30.1],
    'date': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03'])
})

validated_df = schema.validate(df)

# デコレータとしても使用可能
@pa.check_input(schema)
def process_data(df: pd.DataFrame) -> pd.DataFrame:
    return df.groupby('category')['value'].mean()

5.3 クラスベースのスキーマ定義:より型安全なアプローチ。

import pandera as pa
from pandera.typing import DataFrame, Series

class SalesSchema(pa.DataFrameModel):
    id: Series[int] = pa.Field(gt=0)
    category: Series[str] = pa.Field(isin=['A', 'B', 'C'])
    value: Series[float] = pa.Field(ge=0, le=1000)
    date: Series[pa.DateTime]

    class Config:
        strict = True  # 定義外の列を許可しない
        coerce = True  # 型変換を試みる

# 型ヒントとしても機能
def process_sales(df: DataFrame[SalesSchema]) -> DataFrame[SalesSchema]:
    return df[df['value'] > 100]

# バリデーション実行
validated = SalesSchema.validate(df)

References

[1] NumPy, "NumPy Documentation," numpy.org, 2024.

[2] pandas, "pandas documentation," pandas.pydata.org, 2024.

[3] Polars, "Polars User Guide," docs.pola.rs, 2024.

[4] Dask, "Dask Documentation," docs.dask.org, 2024.

[5] pandera, "pandera Documentation," pandera.readthedocs.io, 2024.

免責事項
本コンテンツは2025年12月時点の情報に基づいて作成されている。各ライブラリのAPIは変更される可能性があるため、最新の公式ドキュメントを参照されたい。性能比較は環境により異なる。

← 前章:プロジェクト設計次章:機械学習フレームワーク →