はじめまして。早稲田大学創造理工学研究科 M1の堀田南と申します。AI事業本部 協業リテールメディアディビジョン プリズムパートナーカンパニーで約2ヶ月間インターンをしていました。

本記事では、自然言語を入力すると、LLMを介して所望のSQL文を出力する「TextToSQL」の開発において、AWS Bedrock Agent(以下、エージェント)をStreamlit in Snowflakeを用いてサーバーレスでアプリ化する方法について解説します。日本語での実装記事が少なく、開発中に多くの課題に直面したため、この記事が皆様の開発の一助となれば幸いです。

目次

はじめに

TextToSQLの基本概念については、自然言語からSQL生成!AWS Bedrock Agent × Streamlit in Snowflakeで実現するTextToSQLシステム をご参照ください。

今回の実装では、AWS Knowledge Base(以下、ナレッジベース)を活用したRAGシステムをサーバーレスでアプリケーション化するために、Streamlit in Snowflakeを採用しました。よりセキュアな接続にするため、IAMロールとExternalId(外部ID)による接続方式を採用しています。このExternalIdを用いた連携手法は一般的な解説記事では取り上げられることが少なく、実運用環境での実装例としては比較的珍しいアプローチと言えます。

また、ナレッジベースだけでは対話機能が制限されるため(DynamoDB等を用いれば実装可能ですが複雑になります)、ナレッジベースを参照するエージェントを構築し、Streamlit in Snowflakeと連携することで完全な対話型アプリケーションを実現しています。

Web上には、AWS Bedrockのナレッジベースやエージェントを「試しに使ってみた(RAGとして使えるか、基盤モデルはどんな感じか)」といった検証レベルの記事は数多く存在していますが、実際の業務システムを見据えた技術選定や、企業のセキュリティ要件を満たす実装方法を詳細に解説した記事はほとんど見られません(2025年2月時点)。本実装は、PoCの域を超え、本番環境でも運用可能な堅牢なアーキテクチャ設計を目指しています。

アーキテクチャ

開発したシステムは、AWS Knowledge Baseを利用したRAGをStreamlit in Snowflakeで実装しています。類似の機能を持つSnowflake Cortexも選択肢として検討しましたが、セマンティックモデル構築の煩雑さがあり、柔軟性、カスタマイズ性といった点で今回はAWS Bedrockを利用することにしました。

対話機能の実装において、AWS Bedrockにはconverse APIも存在しますが、これはナレッジベースやエージェントと直接連携することができません。また、ナレッジベースをinvoke_modelで直接呼び出す方法では対話履歴を保持する機能がなく、入力制限が1000トークンに制限されているため、過去の会話履歴を次のリクエストに含めることが困難です。

これらの制約を克服するため、本システムではナレッジベースと接続されているエージェントをinvoke_agent APIで呼び出す設計を採用しました。この方法により、セッションIDを通じた対話履歴の保持が可能となり、さらにエージェントのチューニングを通じて検索パラメータや推論プロセスをカスタマイズすることができます。結果として、複雑なSQL生成タスクに最適化された、高度な対話機能を備えたアプリケーションを実現しています。

そして、接続方法の詳細についてはこれから解説します。よりセキュアな接続にするため、IAMロール+ExternalIdによる接続方式を採用しています。

AWSサイドの設定

ナレッジベース・エージェントの作成

ここでは、作成手順は割愛します。詳細は、公式ドキュメントを参照してください。

IAMロール・ポリシーの設定

任意のナレッジベースやエージェントをクリックすると、「サービスロール」または「許可」の欄にリンクが表示されます。そのリンクをクリックすると、IAMロールの画面に遷移します。

例えば、エージェントの場合はこのような画面になり、信頼ポリシー・アクセス許可ポリシーにそれぞれアクセスできます。

ナレッジベースのIAMロール設定

信頼ポリシー:

基本的には、デフォルトの設定のままで構いません。

{
    // 1. 信頼ポリシー - BedrockサービスとAWSプリンシパルに対してロール引き受けを許可するポリシー
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AmazonBedrockKnowledgeBaseTrustPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "bedrock.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": "<SourceAcount>"
                },
                "ArnLike": {
                    "aws:SourceArn": "<ナレッジベースのARN>"
                }
            }
        }
    ]
}

アクセス許可ポリシー1:

{
    // 2. Bedrock権限ポリシー - モデル呼び出し、データ取得、再ランク付け機能へのアクセスを提供するポリシー
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowInference",
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream",
            ],
            "Resource": [
                "arn:aws:bedrock:*::*/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:Retrieve"
            ],
            "Resource": "<ナレッジベースのARN>"
        },
        {
            "Sid": "AllowBedrockRerank",
            "Effect": "Allow",
            "Action": [
                "bedrock:Rerank"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeAgent",
            ],
            "Resource": "*"
        }
    ]
}

アクセス許可ポリシー2:

{
    // 3. S3アクセスポリシー - ナレッジベースが参照するS3データへの読み取りアクセスを許可するポリシー
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "S3ListBucketStatement",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "<S3のARN>"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:ResourceAccount": [
                        "<SourceAcount>"
                    ]
                }
            }
        },
        {
            "Sid": "S3GetObjectStatement",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "<S3のARN>"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:ResourceAccount": [
                        "<SourceAcount>"
                    ]
                }
            }
        }
    ]
}

エージェントのIAMロール設定

信頼ポリシー

<エージェントのExternalId>は、後述のbedrock_security_integrationで取得します。アクセス許可ポリシーは、既存のポリシーに追記しても、新しくポリシーを追加しても構いません。

{
    // 1. 信頼ポリシー - BedrockサービスとAWSプリンシパルにロールの引き受けを許可するポリシー
    "Version": "2012-10-17",
    "Statement": [
        { // 元からある部分 - Bedrockサービスに特定のエージェントからのアクセスを許可
            "Sid": "AmazonBedrockAgentBedrockFoundationModelPolicyProd",
            "Effect": "Allow",
            "Principal": {
                "Service": "bedrock.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": "<SourceAcount>"
                },
                "ArnLike": {
                    "aws:SourceArn": "<エージェントのARN>"
                }
            }
        },
        { // 追加部分 - 特定のAWSプリンシパルに外部ID条件付きでロール引き受けを許可
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "AWS": "<aws principal>"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "sts:ExternalId": "<エージェントのExternalId>"
                }
            }
        }
    ]
}

アクセス許可ポリシー1:

{
    // 2. Bedrock基盤モデルポリシー - エージェントが特定のモデルとエージェント自身にアクセスするための権限
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AmazonBedrockAgentBedrockFoundationModelPolicyProd",
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": [
                "<基盤モデルのARN>"
            ]
        },
        {
            "Sid": "BedrockAgentRuntimeAccess",
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeAgent"
            ],
            "Resource": [
                "<エージェントのARN>",
                "<エージェントエイリアスのARN>"
            ]
        }
    ]
}

アクセス許可ポリシー2:

{
    // 3. ナレッジベース取得ポリシー - エージェントが特定のナレッジベースからデータを取得する権限
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AmazonBedrockAgentRetrieveKnowledgeBasePolicyProd",
            "Effect": "Allow",
            "Action": [
                "bedrock:Retrieve"
            ],
            "Resource": [
                "<ナレッジベースのARN>"
            ]
        }
    ]
}

Snowflakeサイドの設定

SnowflakeからAWS Bedrockにアクセスするための設定が必要です。以下の記事も参考になります:

Snowflakeから AWS Bedrockへのアクセス設定

まず、Snowflake Worksheetを作成します。外部API認証とシークレットに必要な権限については、Snowflakeの公式ドキュメントを参照してください。

Snowflake Worksheetで以下のSQLを実行します:

-- AWS ロールARN とのセキュリティ統合を作成する
CREATE OR REPLACE SECURITY INTEGRATION bedrock_security_integration
TYPE = API_AUTHENTICATION
AUTH_TYPE = AWS_IAM
ENABLED = TRUE
AWS_ROLE_ARN = '<エージェントのARN>';

-- 信頼関係の統合情報を表示する
DESC SECURITY INTEGRATION bedrock_security_integration;

ここで得られたAPI_AWS_EXTERNAL_IDの値を、エージェントの信頼ポリシー中の<エージェントのExternalId>に代入します。

💡
注意点
本記事では、bedrock_security_integration ステートメントを再度実行すると、新しい API_AWS_EXTERNAL_ID が発行され、IAMロールの信頼ポリシーを再度更新する必要が生じます。このため、後述のユーザー定義関数の動作確認を行う際には、 bedrock_security_integration の再実行は避けることをお勧めします。一度設定した統合情報はそのまま使用することで、不要な認証情報の再発行を防ぎます。

 

続けて、SnowflakeからAmazon Bedrockの生成AIモデルを呼び出したり、データを連携したりするために、Snowflake Worksheetで以下のSQLを実行します:

-- Amazon Bedrockへ認証するためのシークレットキーを作成する
CREATE OR REPLACE SECRET aws_bedrock_access_token
TYPE = CLOUD_PROVIDER_TOKEN
API_AUTHENTICATION = bedrock_security_integration;

-- Amazon Bedrockサービスへの外部通信を許可するネットワークルールを作成する
CREATE OR REPLACE NETWORK RULE bedrock_network_rule
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('bedrock-runtime.<region>.amazonaws.com');
  
-- SnowflakeからAmazon Bedrockの機能にアクセスするための、外部アクセスオブジェクトを作成する
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION bedrock_external_access_integration
ALLOWED_NETWORK_RULES = (bedrock_network_rule)
ALLOWED_AUTHENTICATION_SECRETS=(aws_bedrock_access_token)
ENABLED=true
;

AWS Bedrock エージェントを呼び出すためのユーザー定義関数(UDF)

続けて、AWS Bedrockのエージェントを呼び出すために、Snowflake Worksheetで以下のSQLを実行します:

CREATE OR REPLACE FUNCTION ask_bedrock(
user_context VARCHAR
)
RETURNS VARCHAR
LANGUAGE PYTHON
EXTERNAL_ACCESS_INTEGRATIONS = (bedrock_external_access_integration)
RUNTIME_VERSION = '3.11'
SECRETS = ('cred' = aws_bedrock_access_token)
PACKAGES = ('boto3')
HANDLER = 'bedrock_py'
AS
$$
import boto3
import json
import _snowflake
import uuid
import traceback

AGENT_ID = '<10桁のエージェントID>'
AGENT_ALIAS_ID = '<10桁のエージェントエイリアスID>'

def bedrock_py(user_context):
    cloud_provider_object = _snowflake.get_cloud_provider_token("cred")  
    boto3_session_args = {
        "aws_access_key_id": cloud_provider_object.access_key_id,
        "aws_secret_access_k": cloud_provider_object.secret_access_key,
        "aws_session_token": cloud_provider_object.token,
        "region_name": "ap-northeast-1",
    }
	  session_id = str(uuid.uuid4())
	  
    session = boto3.Session(**boto3_session_args)
    bedrock_agent_runtime_client = session.client("bedrock-agent-runtime")
    
    try:
        # エージェントを呼び出し
        response = bedrock_agent_runtime_client.invoke_agent(
            agentId=AGENT_ID,
            agentAliasId=AGENT_ALIAS_ID,
            sessionId=session_id,
            inputText=user_context,
            enableTrace=False
        )
        
        # invoke_agent のレスポンス処理
        full_response = ""
        if "completion" in response:
            for event in response["completion"]:
                if "chunk" in event and "bytes" in event["chunk"]:
                    chunk_text = event["chunk"]["bytes"].decode("utf-8")
                    full_response += chunk_text
        
        # レスポンスがない場合のフォールバック
        if not full_response:
            # レスポンス全体を確認(デバッグ用)
            return f"No text in response. Response structure: {str(response)}"
        
        return full_response
    
    except Exception as e:
        # エラー詳細を返す
        return f"Error: {str(e)}\nType: {type(e).__name__}\nTraceback: {traceback.format_exc()}"
$$;

テスト実行

UDFの動作確認を行うため、続けてSnowflake Worksheetで以下のSQLコマンドを実行します:

PROMPT = '{任意のプロンプト書く}';
SELECT bedrock_py($PROMPT);

このコマンドが正常に動作し、エージェントからの応答が返ってくれば、接続設定は成功しています。もしエラーが返された場合は、Snowflakeの権限(実行に必要な権限が付与されているか)、IAMの権限(最新のExternalIdが代入されているか、権限は不足していないか)や変数・引数等のミスを確認し、正しく動くまで原因究明を行ってください。

認証情報取得関数の作成

接続が確認できたら、次のステップとして、Streamlit in Snowflakeで使用するためのより汎用的な認証情報取得関数を作成します。上述のbedrock_py()を置換する対応で構いませんので、以下のようにget_cred()関数を作成します。この関数はAWS Bedrockへのアクセスに必要な認証情報をJSON形式で返します:

REATE OR REPLACE FUNCTION get_cred()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
EXTERNAL_ACCESS_INTEGRATIONS = (bedrock_external_access_integration)
SECRETS = ('cred' = aws_bedrock_access_token)
PACKAGES = ('boto3')
HANDLER = 'get_cred'
AS
$$
import boto3
import json
import _snowflake
import uuid

def get_cred():
    cloud_provider_object = _snowflake.get_cloud_provider_token("cred")
    cloud_provider_dictionary = {
        "ACCESS_KEY_ID": cloud_provider_object.access_key_id,
        "SECRET_ACCESS_KEY": cloud_provider_object.secret_access_key,
        "TOKEN": cloud_provider_object.token,
    }
    return json.dumps(cloud_provider_dictionary)
$$;

SELECT get_cred();

この関数は、Streamlit in Snowflakeアプリケーションからのアクセスに必要な一時的な認証情報を安全に取得するために使用されます。

Streamlit in Snowflakeでの実装

次に、チャットボットのUIをStreamlit in Snowflakeで実装します。Streamlit in Snowflakeアプリケーションを実行するには、特定の権限が必要です。詳細については、Snowflakeの公式ドキュメントを参照してください。

また、Streamlitアプリケーションを作成する際には、Snowflake上の特定のdatabase.schemaを選択します。ここで、選択したスキーマにアクセス権を持つユーザーのみがアプリケーションを使用できます。本番環境へのデプロイ前に、セキュリティ設計を慎重に検討してください。

トークン受け渡しの実装

アプリケーション内では、以下のコードで認証情報を取得します:

import json
from snowflake.snowpark.context import get_active_session

session = get_active_session()
aws_bedrock_cred = json.loads(session.sql("SELECT {database.schema}.get_cred()").to_pandas().iloc[0,0])

取得した認証情報はaws_bedrock_cred辞書に格納され、API呼び出し時に使用します。

外部アクセス許可の設定

Streamlit in Snowflakeアプリケーションが外部のAWS Bedrockサービスにアクセスするためには、アプリケーション設定で外部アクセス統合を許可する必要があります:

  1. アプリ設定画面の「App settings」にアクセス
  2. 「External networks」タブを選択
  3. 作成したEXTERNAL ACCESS INTEGRATION(bedrock_external_access_integration)を許可リストに追加

API呼び出しの実装

エージェントを呼び出すコードは以下のとおりです。セッション状態やパラメータの詳細については、AWS Bedrock AgentやKnowledge Baseを活用したRAG構築時に効果的なパラメータチューニングを参照してください:

boto3_session_args = {
  "aws_access_key_id": aws_bedrock_cred["ACCESS_KEY_ID"],
  "aws_secret_access_key": aws_bedrock_cred["SECRET_ACCESS_KEY"],
  "aws_session_token": aws_bedrock_cred["TOKEN"],
  "region_name": REGION_NAME
}
session = boto3.Session(**boto3_session_args)
bedrock_agent_runtime = session.client("bedrock-agent-runtime")
    
response = bedrock_agent_runtime.invoke_agent(
    agentId=AGENT_ID,
    agentAliasId=AGENT_ALIAS_ID,
    sessionId=st.session_state.session_id,
    inputText=input,
    endSession=False,
    enableTrace=False,
    sessionState={
     # 略
    }
)

アプリ全体のコード例

以下にStreamlit in Snowflakeで実装したチャットインターフェースの完全なコード例を示します。このコードは会話履歴の管理、新規チャットの作過去のチャット履歴の表示など、基本的なチャット機能を備えています:

import json
import uuid

import boto3
import streamlit as st
from snowflake.snowpark.context import get_active_session


# Snowflakeのアクティブセッションを取得
# 現在のSnowflakeコネクションからセッション情報を取得する
session = get_active_session()

# Bedrockの認証情報をSnowflakeから取得
# Snowflakeに保存されているAWS Bedrock接続のための認証情報をSQL経由で取得
aws_bedrock_cred = json.loads(session.sql("SELECT {database.schema}.get_cred_agents()").to_pandas().iloc[0,0])

# AWS Bedrockの設定情報
AGENT_ID = '<10桁のエージェントID>'  # BedrockエージェントのID
AGENT_ALIAS_ID = '<10桁のエージェントエイリアスID>'  # エージェントのエイリアスID
REGION_NAME = '<リージョン名>'  # 東京リージョンの場合、'ap-northeast-1'

def init_page():
    """
    Streamlitページの初期化を行う関数
    """
    st.set_page_config(page_title="ChatBot", page_icon="🤗")
    
    if "messages" not in st.session_state:
        st.session_state.messages = []
        
    if "chat_title" not in st.session_state:
        st.session_state.chat_title = "New chat"

    if "session_id" not in st.session_state:
        st.session_state.session_id = str(uuid.uuid4())

def show_page():
    """
    Streamlitページの主要コンポーネントを表示する関数
    """
    # チャットタイトルの表示
    st.title(st.session_state.chat_title)
    
    # チャット履歴を表示
    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])

    # ユーザーからの新しい入力を取得(Bedrock用)
    if prompt_user := st.chat_input("何か質問をどうぞ"):
        # ユーザーメッセージをセッションに追加
        user_message = {"role": "Human", "content": prompt_user}
        st.session_state.messages.append(user_message)
        
        with st.chat_message("Human"):
            st.markdown(prompt_user)
        
        with st.chat_message("Assistant"):
            with st.spinner("回答を生成中..."):
                message_placeholder = st.empty()
                
                # エージェントからの回答を取得
                full_response = _invoke_agent(prompt_user, message_placeholder)
                
                # アシスタントの回答をセッションに追加
                assistant_message = {"role": "Assistant", "content": full_response}
                st.session_state.messages.append(assistant_message)

def _invoke_agent(input, message_placeholder):
    """
    AWS Bedrockエージェントを呼び出す関数
        
    AWS Bedrockエージェントを呼び出し、セッションIDを利用して会話の連続性を保ちながら
    ストリーミングレスポンスとして結果を処理・表示する
    """
    # Bedrockからのストリーミング応答を処理
    boto3_session_args = {
		  "aws_access_key_id": aws_bedrock_cred["ACCESS_KEY_ID"],
		  "aws_secret_access_key": aws_bedrock_cred["SECRET_ACCESS_KEY"],
		  "aws_session_token": aws_bedrock_cred["TOKEN"],
		  "region_name": REGION_NAME
		}
    session = boto3.Session(**boto3_session_args)
    bedrock_agent_runtime = session.client("bedrock-agent-runtime")
        
    # Bedrockエージェントを呼び出す
    response = bedrock_agent_runtime.invoke_agent(
        agentId=AGENT_ID,
        agentAliasId=AGENT_ALIAS_ID,
        inputText=input,
        sessionId=st.session_state.session_id,
        endSession=False,
        enableTrace=False,
        sessionState={
        # 略
		    }
    )
    
    # レスポンスの処理とストリーミング表示
    full_response = ""
    if "completion" in response:
        for event in response["completion"]:
            if "chunk" in event and "bytes" in event["chunk"]:
                chunk_text = event["chunk"]["bytes"].decode("utf-8")
                full_response += chunk_text
            message_placeholder.markdown(full_response)
    
    return full_response

# メイン関数
def main():
    init_page()
    show_page()

if __name__ == "__main__":
    main()

なお、_invoke_agent中の細かいパラメータ設定の詳細については、AWS Bedrock AgentやKnowledge Baseを活用したRAG構築時に効果的なパラメータチューニング をご参照ください。

まとめ

AWS BedockのエージェントとナレッジベースをStreamlit in Snowflakeに接続する技術的な実装方法を詳述しました。よりセキュアな接続にするため、IAMロールとExternalIdによる接続方式を採用しています。

主要ポイント:

  • AWSサイドの設定
    • ナレッジベースとエージェントのそれぞれに必要な信頼ポリシーとアクセス許可ポリシーの設定例
  • Snowflakeサイドの設定
    • セキュリティ統合、シークレット、ネットワークルール、外部アクセス統合の作成
    • エージェントを呼び出すユーザー定義関数(UDF)の実装
    • トークン受け渡しの方法とテスト実行
  • Streamlit in Snowflakeでの実装
    • 外部アクセス統合の許可設定
    • API呼び出しとレスポンス処理の実装