E.B.3 並行プログラミング(asyncio を含む)


並行処理は、プログラムの多くの時間が「待ち」であるときに役立ちます。HTTP 呼び出し、DB 呼び出し、ファイル I/O、スクレイピング、RAG 検索、Agent のツール呼び出しなどです。CPU が重い処理を魔法のように速くするものではありません。
準備するもの
Section titled “準備するもの”- Python 3.10+
- 外部パッケージ不要
pythonを実行できるターミナル
- I/O-bound(I/O 待ち中心):大半の時間を外部システム待ちに使う処理。
- CPU-bound(CPU 計算中心):大半の時間を計算に使う処理。
- Coroutine(コルーチン):
awaitで一時停止できる非同期関数。 asyncio.gather:複数の awaitable を実行し、結果を集める。- Semaphore(セマフォ):同時に動くタスク数を制限する。
- Timeout(タイムアウト):一定時間を超えたら待つのをやめる。
制御付き非同期 batch を動かす
Section titled “制御付き非同期 batch を動かす”async_batch.py を作成します。
import asyncio
async def call_tool(name, delay): await asyncio.sleep(delay) return f"{name}:ok"
async def guarded_call(semaphore, name, delay, timeout): async with semaphore: try: return await asyncio.wait_for(call_tool(name, delay), timeout=timeout) except asyncio.TimeoutError: return f"{name}:timeout"
async def main(): semaphore = asyncio.Semaphore(2) results = await asyncio.gather( guarded_call(semaphore, "search", 0.1, 0.5), guarded_call(semaphore, "database", 0.2, 0.5), guarded_call(semaphore, "slow_tool", 1.0, 0.3), ) print(results)
asyncio.run(main())実行します。
python async_batch.py期待される出力:
['search:ok', 'database:ok', 'slow_tool:timeout']大切なのは gather だけではありません。gather、並行数の上限、タイムアウト処理を組み合わせることです。
上限を変える
Section titled “上限を変える”この小さな確認コードで、2つの上限を見てみます。
import asyncio
for limit in [2, 1]: semaphore = asyncio.Semaphore(limit) print("limit:", limit, "semaphore:", type(semaphore).__name__)期待される出力:
limit: 2 semaphore: Semaphorelimit: 1 semaphore: Semaphore最終結果は同じですが、タスクはより保守的に実行されます。実サービスでは、これにより上流 API を急なリクエストから守れます。
asyncio を使う場面
Section titled “asyncio を使う場面”向いているもの:
- 多数のネットワークリクエスト
- 複数のツール呼び出し
- 複数ソースからの RAG 検索
- DB やキュー待ち
最初の選択肢にしにくいもの:
- 重い数値計算
- 大きな画像変換
- 待ち時間のボトルネックがなく、単純さを優先したいコード
このページを終えたら、この証拠カードを残します。
- Pythonパターン
- デコレータ、イテレータ、ジェネレータ、並行処理プリミティブ、またはメタプログラミングフック
- コード成果物
- 最小限の実行可能な例と表示された出力
- 使用場面
- この pattern が AI app、pipeline、tool、または server を改善する場面
- 失敗確認
- 隠れた副作用、読みにくい抽象化、競合状態、または過度な設計
- 期待される成果
- 実践的なAIシステム用途のメモを含む小さな高度Python例
よくある間違い
Section titled “よくある間違い”- I/O-bound か確認せず、どこにでも
asyncを付ける。 - 並行数上限なしで
gatherを使う。 - タイムアウトを忘れ、遅い上流一つで全体が詰まる。
- 例外を握りつぶし、どのタスクが失敗したか記録しない。
ツール呼び出しをさらに5つ追加し、Semaphore(3) にします。その後、タイムアウトを 0.15 に下げ、いくつが :timeout になるか数えてください。
参考実装と解説
具体的なタイムアウト数は、各ツールに設定した遅延によって変わります。そのため固定の数を作るのではなく、観測した数を報告します。堅実な解答では、結果一覧と数を両方出します。
timeouts = sum(result.endswith(":timeout") for result in results)print("timeouts:", timeouts)説明では、Semaphore(3) が上流ツールへの圧力を制限し、短い timeout が遅い呼び出しを露出させることを書きます。本番では合計数だけでなく、どのツールがタイムアウトしたかもログに残します。