コンテンツにスキップ

E.B.3 並行プログラミング(asyncio を含む)

asyncio 並行制御フローチャート

非同期タスクのタイムアウト・キャンセル・レート制限図

並行処理は、プログラムの多くの時間が「待ち」であるときに役立ちます。HTTP 呼び出し、DB 呼び出し、ファイル I/O、スクレイピング、RAG 検索、Agent のツール呼び出しなどです。CPU が重い処理を魔法のように速くするものではありません。

  • Python 3.10+
  • 外部パッケージ不要
  • python を実行できるターミナル
  • I/O-bound(I/O 待ち中心):大半の時間を外部システム待ちに使う処理。
  • CPU-bound(CPU 計算中心):大半の時間を計算に使う処理。
  • Coroutine(コルーチン)await で一時停止できる非同期関数。
  • asyncio.gather:複数の awaitable を実行し、結果を集める。
  • Semaphore(セマフォ):同時に動くタスク数を制限する。
  • Timeout(タイムアウト):一定時間を超えたら待つのをやめる。

async_batch.py を作成します。

import asyncio
async def call_tool(name, delay):
await asyncio.sleep(delay)
return f"{name}:ok"
async def guarded_call(semaphore, name, delay, timeout):
async with semaphore:
try:
return await asyncio.wait_for(call_tool(name, delay), timeout=timeout)
except asyncio.TimeoutError:
return f"{name}:timeout"
async def main():
semaphore = asyncio.Semaphore(2)
results = await asyncio.gather(
guarded_call(semaphore, "search", 0.1, 0.5),
guarded_call(semaphore, "database", 0.2, 0.5),
guarded_call(semaphore, "slow_tool", 1.0, 0.3),
)
print(results)
asyncio.run(main())

実行します。

Terminal window
python async_batch.py

期待される出力:

Terminal window
['search:ok', 'database:ok', 'slow_tool:timeout']

大切なのは gather だけではありません。gather、並行数の上限、タイムアウト処理を組み合わせることです。

この小さな確認コードで、2つの上限を見てみます。

import asyncio
for limit in [2, 1]:
semaphore = asyncio.Semaphore(limit)
print("limit:", limit, "semaphore:", type(semaphore).__name__)

期待される出力:

Terminal window
limit: 2 semaphore: Semaphore
limit: 1 semaphore: Semaphore

最終結果は同じですが、タスクはより保守的に実行されます。実サービスでは、これにより上流 API を急なリクエストから守れます。

向いているもの:

  1. 多数のネットワークリクエスト
  2. 複数のツール呼び出し
  3. 複数ソースからの RAG 検索
  4. DB やキュー待ち

最初の選択肢にしにくいもの:

  1. 重い数値計算
  2. 大きな画像変換
  3. 待ち時間のボトルネックがなく、単純さを優先したいコード

このページを終えたら、この証拠カードを残します。

Pythonパターン
デコレータ、イテレータ、ジェネレータ、並行処理プリミティブ、またはメタプログラミングフック
コード成果物
最小限の実行可能な例と表示された出力
使用場面
この pattern が AI app、pipeline、tool、または server を改善する場面
失敗確認
隠れた副作用、読みにくい抽象化、競合状態、または過度な設計
期待される成果
実践的なAIシステム用途のメモを含む小さな高度Python例
  • I/O-bound か確認せず、どこにでも async を付ける。
  • 並行数上限なしで gather を使う。
  • タイムアウトを忘れ、遅い上流一つで全体が詰まる。
  • 例外を握りつぶし、どのタスクが失敗したか記録しない。

ツール呼び出しをさらに5つ追加し、Semaphore(3) にします。その後、タイムアウトを 0.15 に下げ、いくつが :timeout になるか数えてください。

参考実装と解説

具体的なタイムアウト数は、各ツールに設定した遅延によって変わります。そのため固定の数を作るのではなく、観測した数を報告します。堅実な解答では、結果一覧と数を両方出します。

timeouts = sum(result.endswith(":timeout") for result in results)
print("timeouts:", timeouts)

説明では、Semaphore(3) が上流ツールへの圧力を制限し、短い timeout が遅い呼び出しを露出させることを書きます。本番では合計数だけでなく、どのツールがタイムアウトしたかもログに残します。