
SnowflakeのS3外部ステージにStreamlitアプリからオブジェクトをアップロード・削除する
データ事業本部の鈴木です。
Snowflake in Streamlitで実行しているアプリから外部ステージとして設定したS3バケットへのオブジェクトアップロードや削除をしたいことがありました。
アプリから内部ステージにアップロードする場合は情報があるものの、外部ステージへのアップロードや削除の場合は意外とどうするといいか見つけられなかったので、試してみた内容をまとめます。
ステージへのアップロードは以下が参考になりました。この記事では特に外部ステージへのアップロードや削除の場合を中心に記載します。
この記事のまとめ
先に結論
以下がポイントでした。
- Streamlitアプリはコンテナランタイムで実行する
- アップロードは内部ステージに一度アップロードして、外部ステージにコピーする
- 削除はストレージ統合に設定したIAMロールの権限に必要な権限を付与する
例えば以下のようなアプリだと実現できました。

実装したコード例
データベース名・スキーマ名・内部ステージ名は自身のものに置き換えてください。
import io
import uuid
from pathlib import PurePosixPath
import streamlit as st
from snowflake.snowpark.context import get_active_session
INTERNAL_STAGE_FQN = "データベース名.スキーマ名.内部ステージ名"
INTERNAL_STAGE_REF = f"@{INTERNAL_STAGE_FQN}"
EXTERNAL_STAGE_FQN = "データベース名.スキーマ名.外部ステージ名"
EXTERNAL_STAGE_REF = f"@{EXTERNAL_STAGE_FQN}"
DELETE_CANDIDATES_KEY = "delete_candidate_names"
def stage_location_for_key(stage_ref: str, relative_key: str) -> str:
file_name = PurePosixPath(relative_key).name
return f"{stage_ref}/{file_name}"
def describe_stages(session) -> bool:
can_desc_stage = True
for label, fqn in (
("内部", INTERNAL_STAGE_FQN),
("外部", EXTERNAL_STAGE_FQN),
):
try:
session.sql(f"DESC STAGE {fqn}").collect()
except Exception as exc:
st.error(
f"{label}ステージ `{fqn}` を参照できません。名前・権限(USAGE 等)を確認してください。"
f"\n\n詳細: {exc}"
)
can_desc_stage = False
return can_desc_stage
def list_stage_files(session):
try:
return session.sql(f"LIST {EXTERNAL_STAGE_REF}").collect()
except Exception as exc:
st.warning(f"ファイル一覧の取得に失敗しました: {exc}")
return []
def format_list_row(row) -> str:
try:
name = row["name"]
except (KeyError, TypeError, IndexError):
name = row[0] if row is not None else ""
if isinstance(name, str) and "/" in name:
return name.split("/", 1)[1]
return str(name)
def main():
st.set_page_config(page_title="ファイルアップローダー", layout="centered")
st.title("ファイルアップローダー")
session = get_active_session()
if not describe_stages(session):
st.stop()
uploaded_file = st.file_uploader(
"アップロードするファイルを選択してください",
help=(
"ブラウザから SiS に読み込んだあと、内部ステージへ put_stream し、"
"copy_files で外部ステージ(S3)へコピーします。内部ステージの一時オブジェクトは "
"成功・失敗にかかわらず finally で削除します。"
),
)
if uploaded_file is not None:
file_name = PurePosixPath(uploaded_file.name).name
tmp_prefix = f"streamlit_upload_tmp/{uuid.uuid4()}/"
internal_dir = f"{INTERNAL_STAGE_REF}/{tmp_prefix}"
internal_file = f"{internal_dir}{file_name}"
try:
file_stream = io.BytesIO(uploaded_file.getvalue())
session.file.put_stream(
file_stream,
internal_file,
auto_compress=False,
overwrite=True,
)
session.file.copy_files(
source=internal_dir,
target_stage_location=f"{EXTERNAL_STAGE_REF}/",
files=[file_name],
detailed_output=True,
)
st.success(f"S3(外部ステージ)へアップロードしました: `{file_name}`")
except Exception as exc:
st.error(f"アップロードに失敗しました: {exc}")
finally:
try:
session.file.remove(internal_dir)
except Exception as cleanup_exc:
st.warning(
f"内部ステージ上の一時ファイルの削除に失敗しました: {cleanup_exc}"
)
with st.expander("アップロード済みファイル一覧", expanded=False):
if st.button("一覧を取得", key="list_stage_files"):
rows = list_stage_files(session)
if rows:
names = [format_list_row(r) for r in rows]
st.dataframe({"name": names}, use_container_width=True, hide_index=True)
else:
st.info(
"ファイルが見つかりません(空、または LIST 権限がありません)。"
)
with st.expander("アップロード済みファイル削除", expanded=False):
if st.button("候補一覧を読み込み", key="load_delete_candidates"):
rows = list_stage_files(session)
st.session_state[DELETE_CANDIDATES_KEY] = (
[format_list_row(r) for r in rows] if rows else []
)
candidates = st.session_state.get(DELETE_CANDIDATES_KEY)
if candidates is None:
st.info("「候補一覧を読み込み」で LIST 結果を読み込んでください。")
elif not candidates:
st.info("削除対象がありません(空、または LIST 権限がありません)。")
else:
selected = st.selectbox(
"削除するオブジェクトキー(ステージ直下の相対パス)",
options=candidates,
key="delete_select",
)
confirm = st.checkbox(
"選択したファイルを削除する(取り消しできません)",
value=False,
key="delete_confirm",
)
if st.button("削除を実行", type="primary", disabled=not confirm):
if selected not in candidates:
st.error("選択が無効です。候補一覧を再読み込みしてください。")
else:
try:
loc = stage_location_for_key(EXTERNAL_STAGE_REF, selected)
session.file.remove(loc)
st.success(f"削除しました: `{selected}`")
st.session_state[DELETE_CANDIDATES_KEY] = [
n for n in candidates if n != selected
]
except Exception as exc:
st.error(f"削除に失敗しました: {exc}")
if __name__ == "__main__":
main()
※ 記事執筆時点だと、コンテナランタイムで実行されているStreamlitアプリからは直接外部ステージにアップロードできませんでした。
技術要素の選定ポイント
機能を実現するため、いくつかの観点で採用する技術要素を選ばないといけない点があり、少しややこしかったです。
まずランタイムはコンテナランタイムを使いました。ウェアハウス実行の場合だと、ストアドプロシージャと同様の制限が適用されるため、ステージ内のオブジェクトが削除できませんでした。
オブジェクトのアップロードは一度内部ステージにアップロードしてから、外部ステージにCOPY FILESしました。例えばSnowsightからもS3の外部ステージにファイルをアップロードできませんが、アプリからも同様に一度内部ステージに上げてから、外部ステージであるクラウドストレージサービスへのコピーをサポートする操作をする必要がありました。ウェアハウスの場合、Snowparkストアドプロシージャ扱いになるため、この操作が可能な点が少し混乱するポイントでした。
StreamlitアプリからS3 APIへリクエストをする際に、認証情報をどう管理するかもポイントでした。今回は、ストレージ統合で設定したIAMロールを使えることが確認できました。
この方法以外には、シークレットに保存した認証情報を使い、boto3でアクセスすることも考えられますが、シークレットの管理が必要になるためストレージ統合のIAMロールで操作できた方がよいですね。
なお、以下はシークレットを使ったAWSへのアクセス例は、以下でBedrockへの例が紹介されています。
以下で検証した内容をご紹介します。
検証の前提
外部ステージの作成
以下のガイドを参考に、S3バケットへの外部ステージを作成しておきました。
ガイドに詳しく方法が記載されているため、ここでは説明を省略します。
Streamlitアプリの機能
以下の3つの機能を持つStreamlitアプリを作成する前提で説明します。
- 外部ステージへのオブジェクトアップロード
- 外部ステージ上のオブジェクトのリスト
- 外部ステージ上のオブジェクトの削除
外部ステージはS3バケットを想定します。SnowflakeアカウントもAWSをクラウドプロバイダーに選んでいることとします。
やってみた
1. ウェアハウス実行のアプリで試した場合
まずはウェアハウス実行のアプリで以下のコードを実行しました。
ウェアハウス実行の実装例
データベース名・スキーマ名・内部ステージ名は自身のものに置き換えてください。
import io
from pathlib import PurePosixPath
import streamlit as st
from snowflake.snowpark.context import get_active_session
STAGE_FQN = "データベース名.スキーマ名.外部ステージ名"
STAGE_REF = f"@{STAGE_FQN}"
DELETE_CANDIDATES_KEY = "delete_candidate_names"
def stage_location_for_key(stage_ref: str, relative_key: str) -> str:
file_name = PurePosixPath(relative_key).name
return f"{stage_ref}/{file_name}"
def describe_stage(session) -> bool:
try:
session.sql(f"DESC STAGE {STAGE_FQN}").collect()
return True
except Exception as exc:
st.error(
"ステージを参照できません。名前・権限(USAGE 等)を確認してください。"
f"\n\n詳細: {exc}"
)
return False
def list_stage_files(session):
try:
return session.sql(f"LIST {STAGE_REF}").collect()
except Exception as exc:
st.warning(f"ファイル一覧の取得に失敗しました: {exc}")
return []
def format_list_row(row) -> str:
try:
name = row["name"]
except (KeyError, TypeError, IndexError):
name = row[0] if row is not None else ""
if isinstance(name, str) and "/" in name:
return name.split("/", 1)[1]
return str(name)
def main():
st.set_page_config(page_title="ファイルアップローダー", layout="centered")
st.title("ファイルアップローダー")
session = get_active_session()
if not describe_stage(session):
st.stop()
uploaded_file = st.file_uploader(
"アップロードするファイルを選択してください",
help="ブラウザから SiS に読み込んだあと、外部ステージ経由で S3 に PUT します。",
)
if uploaded_file is not None:
try:
file_stream = io.BytesIO(uploaded_file.getvalue())
session.file.put_stream(
file_stream,
f"{STAGE_REF}/{uploaded_file.name}",
auto_compress=False,
overwrite=True,
)
st.success(
f"S3(外部ステージ)へアップロードしました: `{uploaded_file.name}`"
)
except Exception as exc:
st.error(f"アップロードに失敗しました: {exc}")
with st.expander("アップロード済みファイル一覧", expanded=False):
if st.button("一覧を取得", key="list_stage_files"):
rows = list_stage_files(session)
if rows:
names = [format_list_row(r) for r in rows]
st.dataframe({"name": names}, use_container_width=True, hide_index=True)
else:
st.info(
"ファイルが見つかりません(空、または LIST 権限がありません)。"
)
with st.expander("アップロード済みファイル削除", expanded=False):
if st.button("候補一覧を読み込み", key="load_delete_candidates"):
rows = list_stage_files(session)
st.session_state[DELETE_CANDIDATES_KEY] = (
[format_list_row(r) for r in rows] if rows else []
)
candidates = st.session_state.get(DELETE_CANDIDATES_KEY)
if candidates is None:
st.info("「候補一覧を読み込み」で LIST 結果を読み込んでください。")
elif not candidates:
st.info("削除対象がありません(空、または LIST 権限がありません)。")
else:
selected = st.selectbox(
"削除するオブジェクトキー(ステージ直下の相対パス)",
options=candidates,
key="delete_select",
)
confirm = st.checkbox(
"選択したファイルを削除する(取り消しできません)",
value=False,
key="delete_confirm",
)
if st.button("削除を実行", type="primary", disabled=not confirm):
if selected not in candidates:
st.error("選択が無効です。候補一覧を再読み込みしてください。")
else:
try:
loc = stage_location_for_key(STAGE_REF, selected)
session.file.remove(loc)
st.success(f"削除しました: `{selected}`")
st.session_state[DELETE_CANDIDATES_KEY] = [
n for n in candidates if n != selected
]
except Exception as exc:
st.error(f"削除に失敗しました: {exc}")
if __name__ == "__main__":
main()
この実装では、st.file_uploaderでファイルをアプリにアップロードし、snowflake.snowpark.FileOperation.put_streamで外部ステージに書き込みます。リストはLISTをSnowpark SQLで実行します。snowflake.snowpark.FileOperation.removeで外部ステージ上のオブジェクトを削除します。
この場合、外部ステージに直接オブジェクトのアップロード・リストは可能ですが、削除が失敗します。
▼アップロードは成功する


▼リストは成功する

▼削除は失敗する

この挙動のポイントとして、ウェアハウスで実行されるStreamlitアプリはストアドプロシージャと同様の制限が適用されるます。
PUTは記事執筆時点で外部テーブルをサポートしていませんが、Snowparkストアドプロシージャからの場合はサポートされます。
一方で、REMOVEはストアドプロシージャから実行できません。この操作は所有者の権限を使用するストアドプロシージャではサポートされておらず、呼び出し元の権限を使用して実行する必要があります。そのためStored procedure execution error: Unsupported statement type 'REMOVE_FILES'.というエラーメッセージが送出されています。
ただし、Understanding owner’s rights and Streamlit in Snowflake appsに記載がある通り、コンテナランタイムで実行している場合は、ストアドプロシージャと同様の制限が適用されなくなるため、同じコードをコンテナランタイムで実行してみました。
2. コンテナランタイムのアプリで試した場合(直接外部ステージにアップロードする)
次に、1で実行したアプリ実装をコンテナ実行のアプリで実行しました。
この場合、外部ステージのオブジェクト削除が可能ですが、アップロードが失敗しました。
▼アップロードは失敗する

▼削除は成功する

この挙動が起こる理由は先ほどの逆で、アプリが所有者権限で実行されるストアドプロシージャと同様の制限が適用されないためREMOVEが可能な一方で、外部ステージへのPUT操作がSnowparkストアドプロシージャからの実行でなくなるためできなくなります。
ただ、この条件でも内部ステージへのPUT操作は可能なため、アップロードをもう一工夫すればよさそうです。
3. コンテナランタイムのアプリで試した場合(内部ステージ経由でアップロードする)
最後に、アプリのアップロード処理を以下のように改善して実行しました。
オブジェクトをS3外部ステージにアップロードする前に、一度内部ステージにアップロードし、外部ステージにCOPYします。
※ 以下は冒頭で紹介した実装例から該当箇所を抜粋しました。
if uploaded_file is not None:
file_name = PurePosixPath(uploaded_file.name).name
tmp_prefix = f"streamlit_upload_tmp/{uuid.uuid4()}/"
internal_dir = f"{INTERNAL_STAGE_REF}/{tmp_prefix}"
internal_file = f"{internal_dir}{file_name}"
try:
file_stream = io.BytesIO(uploaded_file.getvalue())
session.file.put_stream(
file_stream,
internal_file,
auto_compress=False,
overwrite=True,
)
session.file.copy_files(
source=internal_dir,
target_stage_location=f"{EXTERNAL_STAGE_REF}/",
files=[file_name],
detailed_output=True,
)
st.success(f"S3(外部ステージ)へアップロードしました: `{file_name}`")
except Exception as exc:
st.error(f"アップロードに失敗しました: {exc}")
finally:
try:
session.file.remove(internal_dir)
except Exception as cleanup_exc:
st.warning(
f"内部ステージ上の一時ファイルの削除に失敗しました: {cleanup_exc}"
)
この実装であれば、以下のようにアップロードも成功しました。
▼アップロードが成功する


注意点
コンピューティングプールの停止について
ウェアハウスの場合、セッションを切ればウェアハウスがAUTO_SUSPEND値に従って一時停止します。
コンテナの場合は、Streamlitサーバーがコンピューティングプールのノード上で継続的に実行されるため、業務時間外などにより細かく停止したい場合はタスクなどによる停止が必要そうです。
ビューアーが3日間操作を行わないと、Streamlitサーバープロセスが終了し、そこからコンピューティングプールがAUTO_SUSPEND値に従って一時停止します。
また、サスペンド後は起動にも少し時間がかかります。(数分程度)
終わりに
AWS上のSnowflakeから外部ステージとして設定したS3バケットに、Streamlitアプリからオブジェクトをアップロード・削除する例のご紹介でした。
コンテナランタイムであれば、アップロード・削除ともにできましたが、現時点ではアップロードは内部ステージに一度上げてから外部ステージにコピーする必要がありました。
ですが、ストレージ統合に設定したIAMロールの権限でオブジェクト操作ができるため、シークレットの管理も不要でした。









