はじめる前に
- 必須AWS アカウントを持ち、マネジメントコンソールにサインインできること
- 必須DynamoDB でテーブルを作成し、項目を出し入れした経験
- 必須Lambda 関数をコンソールで作成した経験
- 必須IAM ロールの基本的な役割を理解していること
- あると良い「イベント駆動」(何かが起きたことをきっかけに処理が動く考え方)という言葉を聞いたことがある
※ コマンドラインや SSH の操作は一切ありません。すべてブラウザのコンソール上で完結します。
参照する公式ドキュメント
手順に迷ったときや、用語の意味を確かめたいときに開きましょう。
※ リンク切れの場合は、ページタイトルで検索してください。
背景・シナリオ
「データが追加されたら、すぐに何か処理をしたい」という場面があります。定期的にテーブルを確認しに行く(ポーリングする)方法もありますが、無駄が多く、反応も遅れがちです。DynamoDB Streams を有効化すると、テーブルへの変更(追加・更新・削除)が発生した瞬間にその内容を記録した「ストリーム」が作られ、Lambda 関数を自動的に起動するきっかけにできます。今回はその仕組みを実際に体験します。
定期的にテーブルを確認する方法と何が違うの?
定期確認(ポーリング)は、変更がなくても毎回確認のための処理が走り、変更から検知までに時間差が生まれます。Streams を使う方法は、変更があったときだけ Lambda が起動するため、無駄がなく、ほぼリアルタイムに反応できます。
変更前のデータと変更後のデータ、両方が分かるの?
ストリームの設定(ビュータイプ)によります。「新旧の両方のイメージ」を選ぶと、変更前・変更後の両方の内容を受け取れます。今回はこの設定を使い、何が・どう変わったかを確認します。
DynamoDB Streams を有効化(ビュータイプ「新旧イメージ」)し、それをトリガーとする Lambda 関数を作成する。テーブルに項目を追加・更新・削除し、それぞれの変更内容が CloudWatch Logs に記録されることを確認する。
つくる構成
テーブルへの変更が DynamoDB Streams によって検知され、Lambda 関数を起動して CloudWatch Logs へ出力するまでの流れです。3 つの要素が一列につながるパイプラインです。
(追加・更新・削除)
レコードを生成
へ出力
要件
以下の要件を満たし、テーブルの変更が Lambda 関数を経由して CloudWatch Logs に記録される一連の流れを確認してください。
| No | 要件 |
|---|---|
| 1 | リージョンは「東京(ap-northeast-1)」を使用する。 |
| 2 | パーティションキーを持つテーブルを 1 つ作成する。テーブル名・パーティションキー名は自由(例:StreamDemoTable / ItemId)。 |
| 3 | テーブルの「Streams(DynamoDB Streams 機能)」を有効化し、ビュータイプを「新旧イメージ」にする。 |
| 4 | Lambda 関数を 1 つ作成する(ランタイムは Python、名前は自由)。下記のコードを貼り付ける。 |
| 5 | Lambda 関数のトリガーとして、手順 2 のテーブルの DynamoDB Streams を追加する。 |
| 6 | テーブルに項目を追加・更新・削除し、それぞれ CloudWatch Logs に変更内容が記録されることを確認する。 |
def lambda_handler(event, context):
for record in event["Records"]:
event_name = record["eventName"]
keys = record["dynamodb"].get("Keys", {})
print(f"変更の種類: {event_name}")
print(f"対象のキー: {keys}")
if "OldImage" in record["dynamodb"]:
print(f"変更前: {record['dynamodb']['OldImage']}")
if "NewImage" in record["dynamodb"]:
print(f"変更後: {record['dynamodb']['NewImage']}")
return {"processedCount": len(event["Records"])}
構築の進め方
「テーブルを作る → Streams を有効化する → Lambda を作る → トリガーをつなぐ → 変更して確かめる」の順に進めます。
-
マネジメントコンソールにサインインし、リージョンを合わせる
ブラウザで AWS マネジメントコンソールにサインインし、画面右上のリージョンが「アジアパシフィック(東京)ap-northeast-1」になっていることを確認します。
-
DynamoDB コンソールでテーブルを作成する
DynamoDB コンソールの「テーブルの作成」を開き、テーブル名とパーティションキー名を自由に決めて作成します(例:テーブル名
StreamDemoTable、パーティションキーItemId)。設定はデフォルトのままでよいテーブルクラスや読み書きの設定は、今回はデフォルトのままで問題ありません。
-
テーブルの「エクスポートとストリーム」タブから、DynamoDB Streams を有効化する
作成したテーブルを開き、「エクスポートとストリーム」タブを選びます。「DynamoDB streams の詳細」の「有効にする」を押し、ビュータイプとして「新規および旧イメージ(新旧の両方のイメージ)」を選択して有効化します。
ビュータイプの選び方ビュータイプには「新しいイメージのみ」「古いイメージのみ」「新旧両方のイメージ」「キーのみ」などの選択肢があります。今回は変更前後の両方を見たいため、「新旧両方のイメージ」を選びます。
-
Lambda 関数を作成する
Lambda コンソールで「関数の作成」→「一から作成」を選びます。関数名を自由に決め、ランタイムはPythonの任意のバージョンを選んで作成します。コードエディタの中身を、上記の要件セクションに掲載したコードに置き換え、「Deploy」ボタンを押して保存します。
-
Lambda 関数の「トリガーを追加」から、DynamoDB Streams を接続する
作成した Lambda 関数の画面で「トリガーを追加」を選び、トリガーの種類として「DynamoDB」を選びます。DynamoDB テーブルの欄で手順 2 のテーブルを指定し、トリガーを作成します。
権限はコンソールが自動で設定してくれる場合が多いこのトリガーを追加すると、Lambda 関数の実行ロールに DynamoDB Streams を読み取るための権限が、コンソール側で自動的に追加される場合が多いです。
-
DynamoDB の「項目を探索」から、新しい項目を 1 件追加する
テーブルの「項目を探索」タブを開き、「項目を作成」から、パーティションキーの値と、任意の属性(例:
Status=new)を持つ項目を 1 件追加します。 -
追加した項目の属性を更新し、続けて削除する
同じ項目を開き、先ほど追加した属性の値を変更して保存します(更新)。その後、同じ項目を選んで削除します。
3 種類の変更を意図的に発生させる追加・更新・削除という 3 種類の変更を順番に発生させることで、それぞれに対応するログの違いを見比べられるようにします。
-
CloudWatch Logs でログを確認する
CloudWatch コンソールの「ロググループ」から、Lambda 関数に対応するロググループ(
/aws/lambda/関数名)を開きます。最新のログストリームを開き、追加・更新・削除それぞれに対応するログが出力されていることを確認します。変更の種類(イベント名)、変更前/変更後の内容に注目しましょう。
つまずきポイント
初学者がよく引っかかる箇所を先回りでまとめました。答えそのものは載せていませんが、「どこを見直せばよいか」の手がかりとして使ってください。
「項目を変更しても、CloudWatch Logs に何も増えない」
Lambda 関数の実行ロールに、DynamoDB Streams を読み取る権限が不足している可能性があります。トリガー追加時にコンソールが自動でロールに権限を追加してくれる場合が多いですが、念のため Lambda 関数の「モニタリング」タブで実行エラーが起きていないかも確認してみましょう。
「削除したときの変更前の内容が見当たらない、または更新後の内容が空に見える」
変更の種類ごとに、含まれる情報が異なります。追加イベントには「変更前」の内容が存在せず、削除イベントには「変更後」の内容が存在しません。今回のコードでは "OldImage" in record["dynamodb"] のように存在確認をしていますが、どの変更の種類のときに、どちらが存在しないかを見比べてみましょう。
完了チェック
要件の再確認ではなく、画面のどこを見れば達成を確認できるかをまとめました。各コンソールを開いて、次を順に確かめましょう。
- テーブルの「エクスポートとストリーム」タブで、DynamoDB Streams が有効になっている。
- Lambda 関数の「トリガー」欄に、DynamoDB が表示されている。
- CloudWatch Logs に、3 種類(追加・更新・削除)のログが見つかる。
- 各ログで、変更の種類(INSERT / MODIFY / REMOVE)が読み取れる。
- 更新のログで、変更前・変更後の値が両方確認できる。
考えてみよう
手を動かすことに加えて、次の問いに自分の言葉で答えられるようにしておくと、理解がより深まります。
- 定期的にテーブルを確認する方法と比べて、Streams を使う方法はどんな場面で特に効果を発揮するでしょうか。
ヒント
変更の発生頻度が低く、かつ変更からの反応の速さが重要な場面を考えてみましょう。定期確認では、確認の間隔を短くするほど無駄な処理が増え、間隔を長くするほど反応が遅れるというトレードオフがあります。 - 同時に複数の項目を変更したら、Lambda は何回起動しそうでしょうか(1 件ずつか、まとめてか、という観点で考えてみましょう)。
ヒント
今回のコードはevent["Records"]をループで処理しています。これは、1 回の Lambda の起動で複数件の変更レコードがまとめて渡される場合があることを意味します。「変更 1 件 = 起動 1 回」とは限らない、という前提で考えてみましょう。 - 今回はログに出力するだけでしたが、変更内容をもとに別のテーブルへ反映したり、通知を送ったりする処理に発展させるとしたら、何に注意が必要そうでしょうか。
ヒント
同じ変更レコードが何らかの理由で複数回処理される可能性や、処理に失敗したときの再試行の挙動を考えてみましょう。「1 回だけ確実に処理される」とは限らない前提で、繰り返し実行されても結果が変わらないような処理の作り方が手がかりになります。
後片づけ
作ったものを片づけるところまでが一連の流れです。次の順で進めると迷いにくいです。
- Lambda 関数を削除する:Lambda コンソールの関数一覧から、作成した関数を選び、削除します。
- トリガーが解除されていることを確認する:Lambda 関数を削除すると、DynamoDB Streams のトリガーも合わせて解除されます。
- DynamoDB テーブルを削除する:DynamoDB コンソールのテーブル一覧から、作成したテーブル(例:
StreamDemoTable)を選び、削除します。 - CloudWatch Logs のロググループを削除する:
/aws/lambda/関数名のロググループを CloudWatch Logs コンソールから削除します。
削除するのは、このハンズオンで自分が作ったものだけ
テーブルやロググループの一覧には、すでに使っている他のリソースも並んでいるかもしれません。間違えて削除しないよう、このハンズオンのために付けた名前のものだけを選んで削除してください。
コストに関する注意: DynamoDB Streams の読み取り自体には、Lambda との統合で使う範囲では追加料金がかからない場合が多いですが、Lambda の実行回数・実行時間に応じた料金が発生します(今回程度の少量であれば無料利用枠の範囲内に収まることが多いです)。CloudWatch Logs もログの取り込み・保存量に応じて料金が発生するため、検証が終わったら上の「後片づけ」に沿って削除することをおすすめします。最新の料金は AWS の公式料金ページで確認してください。