Pythonで大量CSVを一瞬でマージ!手作業エクセル集計を卒業する方法

手作業でExcelファイルを開いてコピペ、並び替え、結合…そんな時間、もう終わりにしませんか?
この記事では「数百〜数万ファイル・数GB規模」のCSVを現実的な速度でマージ・集計する実務的な方法を、初心者でも使える形でまとめます。
用途別に最速のツール(Python+ライブラリ、CLIツール)と実践コード、注意点、トラブルシュートまでカバーします。


この記事で得られること

  • デスクトップ/サーバーで大量CSVを短時間で結合する具体的手順
  • メモリ不足でも動く「外部メモリ」手法(chunk・dask・polars)
  • 実務でよくある問題(エンコーディング、列不揃い、重複、型崩れ)の対処法
  • バッチ化・自動化(スケジューラやWindowsタスクスケジューラ例)

1. 前提と用語確認

  • 「マージ(merge)」…キー列で結合(SQLのJOIN)。
  • 「結合/連結(concat/append/stack)」…ファイルを縦に連ねる。多くの集計はこれでOK。
  • 想定シナリオ:複数フォルダに散らばった多数のCSVを1つにまとめ、列の型を整え、重複を除いて集計レポートを作る。

2. まずは簡単・確実:pandasでの基本(小〜中規模、メモリに余裕がある場合)

# concat_all.py
import glob
import pandas as pd

files = sorted(glob.glob("data/*.csv"))  # フォルダ内のCSVを取得
dfs = [pd.read_csv(f, encoding="utf-8") for f in files]  # 必要ならencodingを変える
df = pd.concat(dfs, ignore_index=True, sort=False)  # 列が不揃いでも結合可能
df.drop_duplicates(inplace=True)  # 重複削除(必要なら)
df.to_csv("merged.csv", index=False, encoding="utf-8")

長所:コードがシンプルで型処理や集計が簡単。
短所:メモリに乗らないサイズ(例えば数GB以上のデータ)だと失敗する。


3. メモリ不足を回避する:chunked 読み込み(pandas)

メモリに乗らない大きなファイルや大量ファイルを扱うなら、chunksizeで分割して逐次書き出しましょう。

# chunk_concat.py
import glob
import pandas as pd

files = sorted(glob.glob("data/*.csv"))
out_cols = None  # 最初のファイルから列を決める

with open("merged_stream.csv", "w", encoding="utf-8", newline='') as fout:
    first = True
    for f in files:
        for chunk in pd.read_csv(f, chunksize=100000, encoding="utf-8"):
            if first:
                chunk.to_csv(fout, header=True, index=False)
                first = False
            else:
                chunk.to_csv(fout, header=False, index=False)

ポイント

  • chunksizeはメモリに応じて調整(例:100k行など)
  • 列の順序がファイルで微妙に違う場合は、最初に列名を統一してから書き出すと混乱を防げます。

4. 超高速ツール:Polars(Rust製の高速データフレーム) — 推奨(大規模向け)

Polarsは同じ操作でもpandasより高速でメモリ効率が良いです。特に「読み込み・結合・簡単集計」を高速化したい場合に最適。

pip install polars
# polars_concat.py
import polars as pl
from glob import glob

files = glob("data/*.csv")
# 複数ファイルを一気に読み込んでconcat
dfs = [pl.read_csv(f, infer_schema_length=1000) for f in files]
df = pl.concat(dfs, how="vertical")
df = df.unique()  # 重複削除
df.write_csv("merged_polars.csv")

長所

  • マルチスレッドで読み込み/処理
  • メモリ効率が良い、速度が速い

5. 分散・アウトオブコア処理:Dask(さらに巨大データ向け)

数十GB〜TB級で並列クラスタを使えるならDaskがおすすめ。ローカルでも並列に処理できます。

pip install dask[complete] fastparquet
# dask_concat.py
import dask.dataframe as dd
df = dd.read_csv("data/*.csv", encoding="utf-8", assume_missing=True)
# 必要な処理を遅延評価で定義
df = df.drop_duplicates()
df.to_csv("out/merged-*.csv", index=False, single_file=True)  # single_fileは遅いが1ファイルにまとめる

注意点

  • Daskは遅延評価(compute()を呼ばないと実行されない)
  • クラスターでスケールさせる前に小さなデータで動作確認を

6. CLIで一瞬(Linux / Mac / WSL / Git Bash) — 最速の選択肢

  • csvkitcsvstack:簡単な連結に便利
  • mlr (Miller):ストリーム処理に強い
  • awk:単純な連結なら超高速

例:単純に縦に結合(ヘッダは先頭ファイルだけ保持)

# ヘッダを残して全ファイルを結合
(head -n 1 data/file1.csv && tail -q -n +2 data/*.csv) > merged.csv

mlr(Miller)を使う例(インストール後)

mlr --csv cat data/*.csv > merged.csv

長所:非常に高速、低メモリ。
短所:複雑な型変換や細かい前処理はPythonでやるほうが楽。


7. よくあるトラブルと解決法

1) エンコーディングが混在する

  • 解決:errors='replace'encoding='cp932'(日本語Windows由来)を試す。pandasならpd.read_csv(..., encoding='cp932', errors='replace')

2) 列が揃わない(列名の違い・順序不一致)

  • 解決:読み込み時に列名を正規化(小文字化・空白削除)して統一後に結合。例:df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_')

3) データ型が勝手に変わる(数値が文字列になるなど)

  • 解決:読み込み後に astype で型指定、あるいは converters パラメータでカスタム変換をする。

4) 重複とキーの重複

  • 解決:どの列が重複判定かを明示して drop_duplicates(subset=[...]) を使う。JOINの一意性を確認する。

5) ヘッダ行がファイルごとに含まれる/ない

  • 解決:CLIで tail -n +2 を使ってヘッダを除いて連結するか、pandasのchunk処理で header=0/header=None を適切に使う。

8. 実務でのベストプラクティス(チェックリスト)

  1. 小さいサンプル(10ファイル)でフローを作る。
  2. 列名正規化ルールを決める(小文字化・アンダースコア化)。
  3. 日付列や数値列は読み込み時に型を指定する。
  4. ログ(どのファイルを処理したか、件数)を残す。
  5. 処理を自動化して差分だけ処理(新着ファイルのみ)する。
  6. バックアップのポリシーを決める(元ファイルは消さない)。
  7. 出力はCSVだけでなくParquet等の列指向フォーマットも検討(高速・圧縮・メタデータ保持)。

9. 具体的な自動化例:新着CSVだけマージし差分を記録する(シンプル)

# incremental_merge.py
import os
import glob
import pandas as pd

seen_file = "processed_files.txt"
processed = set()
if os.path.exists(seen_file):
    with open(seen_file) as f:
        processed = set(line.strip() for line in f)

files = sorted(glob.glob("incoming/*.csv"))
new_files = [f for f in files if f not in processed]

if not new_files:
    print("新規ファイルはありません")
else:
    for f in new_files:
        df = pd.read_csv(f)
        df.to_csv("merged.csv", mode='a', header=not os.path.exists("merged.csv"), index=False)
        with open(seen_file, "a") as sf:
            sf.write(f + "\n")
    print(f"{len(new_files)} 件を追加しました")

このスクリプトをcron(Linux)やタスクスケジューラ(Windows)で定期実行すれば、手作業がほぼ不要になります。


10. 選び方ガイド(簡単)

  • 小〜中規模(数十〜数百MB):pandasが最も楽。
  • 数GB〜数十GB:polarsを試す(高速・簡単)。
  • 数十GB〜TB:Dask(クラスタ運用)or サーバー上でCLI(mlr/awk)+Parquetに変換。
  • 単純に大量ファイルを「ただ繋げたい」だけ:CLIのcat/tail/mlrが最速。

11. まとめ(実行プラン)

  1. まずはサンプル10ファイルでフロー確認。
  2. 列名正規化 → 型指定 → chunkまたはpolarsで結合。
  3. 重複削除・キー整合性チェック。
  4. 出力はCSV/Parquetどちらかを選択(分析頻度が高ければParquet推奨)。
  5. 自動化スクリプトを作って定期実行し、ログを残す。

付録:よく使うワンライナー&コマンド

ヘッダを残して複数CSVを結合(bash)

(head -n 1 data/file1.csv && tail -q -n +2 data/*.csv) > merged.csv

Miller(mlr)で簡単にcat

mlr --csv cat data/*.csv > merged.csv

Parquetにして高速に扱う(pandas→pyarrow)

import pandas as pd
df = pd.concat([pd.read_csv(f) for f in files])
df.to_parquet("merged.parquet")
タイトルとURLをコピーしました