Databricks では、Unity カタログを使用して Lakeflow 宣言パイプラインを構成することをお勧めします。 Unity カタログの使用は、新しく作成されたパイプラインの既定値です。
Unity カタログで構成されたパイプラインは、定義されたすべての具体化されたビューとストリーミング テーブルを、指定されたカタログとスキーマに発行します。 Unity カタログ パイプラインは、他の Unity カタログ テーブルとボリュームから読み取ることができます。
Unity カタログ パイプラインによって作成されたテーブルに対するアクセス許可を管理するには、GRANT と REVOKEを使用します。
注
この記事では、パイプラインの現在の既定の発行モードの機能について説明します。 2025 年 2 月 5 日より前に作成されたパイプラインでは、従来の発行モードと LIVE 仮想スキーマが使用される場合があります。
LIVE スキーマ (レガシー)を参照してください。
要件
Unity カタログのターゲット スキーマにストリーミング テーブルと具体化されたビューを作成するには、スキーマと親カタログに対する次のアクセス許可が必要です。
-
USE CATALOGターゲット カタログに対する特権。 - パイプラインで
CREATE MATERIALIZED VIEWを作成する場合は、ターゲット スキーマに対するUSE SCHEMAおよび 権限。 - ターゲット スキーマに対する
CREATE TABLEおよびUSE SCHEMAの特権が必要です。パイプラインがストリーミング テーブルを作成する場合。 - パイプラインで新しいスキーマを作成する場合は、ターゲット カタログに対する
USE CATALOGおよびCREATE SCHEMA特権が必要です。
Unity カタログ対応パイプラインを実行するためのコンピューティング要件:
- コンピューティング リソースは、標準アクセス モードで構成する必要があります。 専用コンピューティングはサポートされていません。 アクセス モードを参照してください。
Unity カタログ (ストリーミング テーブルや具体化されたビューを含む) を使用して Lakeflow 宣言パイプラインによって作成されたテーブルに対してクエリを実行するために必要なコンピューティングには、次のいずれかが含まれます。
- SQL倉庫
- Databricks Runtime 13.3 LTS 以降の標準アクセス モードでのコンピューティング。
- 専用アクセス モード コンピューティング。専用コンピューティングできめ細かいアクセス制御が有効になっている場合 (つまり、Databricks Runtime 15.4 以降で実行されており、ワークスペースに対してサーバーレス コンピューティングが有効になっています)。 詳細については、 専用コンピューティングでのきめ細かなアクセス制御に関するページを参照してください。
- 専用アクセス モードは、テーブル所有者がクエリを実行する場合にのみ、13.3 LTS から 15.3 で計算されます。
追加のコンピューティング制限が適用されます。 次のセクションを参照してください。
制限事項
Lakeflow 宣言パイプラインで Unity カタログを使用する場合の制限事項を次に示します。
- 既定では、Unity カタログ対応パイプラインを実行するコンピューティングからドライバー ログを表示できるのは、パイプライン所有者とワークスペース管理者だけです。 他のユーザーがドライバー ログにアクセスできるようにするには、「 管理者以外のユーザーが Unity カタログ対応パイプラインからドライバー ログを表示するを参照してください。
- Hive メタストアを使用する既存のパイプラインは、Unity Catalog を使用するようにアップグレードすることはできません。 Hive メタストアに書き込む既存のパイプラインを移行するには、新しいパイプラインを作成し、データ ソースからデータを再取り込みする必要があります。 Hive メタストア パイプラインを複製して Unity カタログ パイプラインを作成する
を参照してください。
- Unity カタログパブリック プレビュー中に作成されたメタストアにアタッチされたワークスペースに、Unity カタログ対応パイプラインを作成することはできません。 「特権継承へのアップグレード」を参照してください。
- JAR はサポートされていません。 サードパーティの Python ライブラリのみがサポートされています。 Lakeflow 宣言パイプラインの Python 依存関係の管理に関するページを参照してください。
- ストリーミング テーブルのスキーマを変更するデータ操作言語 (DML) クエリはサポートされていません。
- パイプラインで作成された具体化されたビューは、パイプラインの外部 (たとえば、別のパイプラインやダウンストリーム ノートブック) のストリーミング ソースとして使用することはできません。
- 具体化されたビューとストリーミング テーブルのデータは、包含スキーマの格納場所に格納されます。 スキーマストレージの場所が指定されていない場合、テーブルはカタログストレージの場所に格納されます。 スキーマとカタログのストレージの場所が指定されていない場合、テーブルはメタストアのルート ストレージの場所に格納されます。
- カタログ エクスプローラー History タブには、具体化されたビューの履歴は表示されません。
- テーブルを定義する場合、
LOCATIONプロパティはサポートされません。 - Unity Catalog 対応パイプラインは、Hive メタストアに発行できません。
- Python UDF のサポートはパブリック プレビュー段階です。
注
具体化されたビューをサポートする基になるファイルには、具体化されたビュー定義に表示されないアップストリーム テーブルのデータ (個人を特定できる情報を含む) が含まれる場合があります。 このデータは、具体化されたビューの増分更新をサポートするために、基になるストレージに自動的に追加されます。
具体化されたビューの基になるファイルは、具体化されたビュー スキーマの一部ではないアップストリーム テーブルからデータを公開する可能性があるため、Databricks では、基になるストレージを信頼関係のないダウンストリーム コンシューマーと共有しないことをお勧めします。
たとえば、具体化されたビュー定義に COUNT(DISTINCT field_a) 句が含まれているとします。 具体化されたビュー定義には集計 COUNT DISTINCT 句のみが含まれていますが、基になるファイルには field_a の実際の値の一覧が含まれています。
Hive メタストアと Unity カタログ パイプラインを一緒に使用できますか?
ワークスペースには、Unity カタログと従来の Hive メタストアを使用するパイプラインを含めることができます。 ただし、1 つのパイプラインで Hive メタストアと Unity カタログに書き込むことはできません。 Hive メタストアに書き込む既存のパイプラインは、Unity カタログを使用するようにアップグレードすることはできません。 Hive メタストアに書き込む既存のパイプラインを移行するには、新しいパイプラインを作成し、データ ソースからデータを再取り込みする必要があります。 Hive メタストア パイプラインを複製して Unity カタログ パイプラインを作成する
Unity カタログを使用していない既存のパイプラインは、Unity カタログで構成された新しいパイプラインを作成しても影響を受けません。 これらのパイプラインは、構成されたストレージの場所を使用して Hive メタストアにデータを保持し続けます。
このドキュメントで特に指定しない限り、既存のすべてのデータ ソースと Lakeflow 宣言パイプライン機能は、Unity カタログを使用するパイプラインでサポートされています。 Python と SQL の両方のインターフェイスは、Unity Catalog を使用するパイプラインでサポートされています。
非アクティブなテーブル
Unity カタログにデータを保持するように Lakeflow 宣言型パイプラインが構成されている場合、パイプラインはテーブルのライフサイクルとアクセス許可を管理します。 テーブルの定義がパイプラインから削除されると、テーブルが非アクティブになる可能性があり、パイプラインが削除されると削除されます。
パイプライン ソースからテーブル定義を削除すると、次のパイプライン更新によって、対応する具体化されたビューまたはストリーミング テーブルエントリが非アクティブとしてマークされます。 非アクティブなテーブルに対してクエリを実行することはできますが、パイプラインによって更新されなくなります。 具体化されたビューまたはストリーミング テーブルをクリーンアップするには、テーブルを明示的に DROP します。
- 削除されたテーブルは、
UNDROPコマンドを使用して 7 日以内に復旧できます。 - 次のパイプライン更新時に具体化されたビューまたはストリーミング テーブルエントリが Unity カタログから削除されるレガシ動作を保持するには、パイプライン構成
"pipelines.dropInactiveTables": "true"設定します。 実際のデータは、誤って削除された場合に復旧できるように、一定期間保持されます。 具体化されたビューまたはストリーミング テーブルをパイプライン定義に追加し直すことで、7 日以内にデータを回復できます。
(パイプライン ソースからテーブル定義を削除するのではなく) パイプライン全体を削除すると、そのパイプラインで定義されているすべてのテーブルも削除されます。 Lakeflow 宣言型パイプライン UI では、パイプラインの削除を確認するメッセージが表示されます。
Lakeflow 宣言パイプラインから Unity カタログにテーブルを書き込む
Unity カタログにテーブルを書き込むには、ワークスペースを介して操作するようにパイプラインを構成する必要があります。 パイプラインを作成したらStorage オプションで Unity Catalog を選択しCatalog ドロップダウン メニューでカタログを選択し、既存のスキーマを選択するか、Target スキーマ ドロップダウン メニューに新しいスキーマの名前を入力します。 Unity Catalog カタログの詳細については、「Azure Databricks のカタログとは」を参照してください。 Unity Catalog のスキーマについては、「Azure Databricks のスキーマとは」を参照してください。
Unity Catalog パイプラインにデータを取り込む
Unity Catalog を使用するように構成されたパイプラインは、次の場所からデータを読み取ることができます。
- Unity Catalog のマネージド テーブルと外部テーブル、ビュー、具体化されたビュー、ストリーミング テーブル。
- Hive メタストアのテーブルとビュー。
-
read_files()関数を使用して Unity Catalog の外部の場所から読み取る自動ローダー。 - Apache Kafka と Amazon Kinesis。
Unity Catalog テーブルと Hive メタストア テーブルからの読み取りの例を次に示します。
Unity Catalog テーブルからのバッチ インジェスト
SQL
CREATE OR REFRESH MATERIALIZED VIEW
table_name
AS SELECT
*
FROM
my_catalog.my_schema.table1;
Python
@dp.materialized_view
def table_name():
return spark.read.table("my_catalog.my_schema.table")
Unity Catalog テーブルから変更をストリーム配信する
SQL
CREATE OR REFRESH STREAMING TABLE
table_name
AS SELECT
*
FROM
STREAM(my_catalog.my_schema.table1);
Python
@dp.table
def table_name():
return spark.readStream.table("my_catalog.my_schema.table")
Hive メタストアからデータを取り込む
Unity Catalog を使用するパイプラインでは、hive_metastore カタログを使用して Hive メタストア テーブルからデータを読み取ることができます。
SQL
CREATE OR REFRESH MATERIALIZED VIEW
table_name
AS SELECT
*
FROM
hive_metastore.some_schema.table;
Python
@dp.materialized_view
def table3():
return spark.read.table("hive_metastore.some_schema.table")
自動ローダーからデータを取り込む
SQL
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"/path/to/uc/external/___location",
format => "json"
)
Python
@dp.table(table_properties={"quality": "bronze"})
def table_name():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(f"{path_to_uc_external_location}")
)
具体化されたビューを共有する
既定では、パイプライン所有者のみが、パイプラインによって作成されたデータセットに対してクエリを実行するアクセス許可を持ちます。 GRANT ステートメントを使用して他のユーザーがテーブルに対してクエリを実行できるようにしたり、REVOKE ステートメントを使用してクエリ アクセスを取り消したりすることができます。 Unity Catalog での権限の詳細については、「Unity Catalog の権限の管理」を参照してください。
テーブルに SELECT 権限を付与する
GRANT SELECT ON TABLE
my_catalog.my_schema.table_name
TO
`user@databricks.com`
テーブルの SELECT 権限を取り消す
REVOKE SELECT ON TABLE
my_catalog.my_schema.table_name
FROM
`user@databricks.com`
テーブルの作成または具体化されたビューの作成権限を付与する
GRANT CREATE { MATERIALIZED VIEW | TABLE } ON SCHEMA
my_catalog.my_schema
TO
{ principal | user }
パイプラインの系列を表示する
Lakeflow 宣言パイプラインのテーブルの系列は、カタログ エクスプローラーに表示されます。 カタログ エクスプローラーの系列 UI には、Unity カタログ対応パイプラインの具体化されたビューまたはストリーミング テーブルのアップストリームテーブルとダウンストリーム テーブルが表示されます。 Unity カタログ系列の詳細については、Unity カタログを使用したデータ系列の表示に関するページを参照してください。
Unity カタログ対応パイプラインの具体化されたビューまたはストリーミング テーブルの場合、現在のワークスペースからパイプラインにアクセスできる場合、カタログ エクスプローラーの系列 UI は、具体化されたビューまたはストリーミング テーブルを生成したパイプラインにもリンクします。
ストリーミング テーブルのデータを追加、変更、または削除する
データ操作言語 (DML) ステートメント (挿入、更新、削除、マージステートメントなど) を使用して、Unity カタログに発行されたストリーミング テーブルを変更できます。 ストリーミング テーブルに対する DML クエリのサポートにより、一般データ保護規則 (GDPR) に準拠するためにテーブルを更新するなどのユース ケースが可能になります。
注
- ストリーミング テーブルのテーブル スキーマを変更する DML ステートメントはサポートされていません。 DML ステートメントがテーブル スキーマの進化を試みないことを確認します。
- ストリーミング テーブルを更新する DML ステートメントは、Databricks Runtime 13.3 LTS 以降を使用する共有 Unity Catalog クラスターまたは SQL ウェアハウスでのみ実行できます。
- ストリーミングには追加専用のデータソースが必要なため、処理で (DML ステートメントなどによる) 変更を伴うソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグを設定します。
skipChangeCommitsが設定されていれば、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。 処理にストリーミング テーブルが必要ない場合は、具体化されたビュー (追加専用の制限がない) をターゲット テーブルとして使用できます。
ストリーミング テーブル内のレコードを変更する DML ステートメントの例を次に示します。
特定の ID を持つレコードを削除します。
DELETE FROM my_streaming_table WHERE id = 123;
特定の ID を持つレコードを更新します。
UPDATE my_streaming_table SET name = 'Jane Doe' WHERE id = 123;
行フィルターと列マスクを使用してテーブルを発行する
Important
この機能は パブリック プレビュー段階です。
行フィルターを使用すると、テーブル スキャンで行がフェッチされるたびにフィルターとして適用される関数を指定できます。 これらのフィルターにより、後続のクエリでフィルター述語が true と評価される行のみが返されるようになります。
列マスクを 使用すると、テーブル スキャンが行をフェッチするたびに列の値をマスクできます。 その列の今後のクエリでは、列の元の値ではなく、評価された関数の結果が返されます。 行フィルターと列マスクの使用の詳細については、「 行フィルターと列マスク」を参照してください。
行フィルターと列マスクの管理
具体化されたビューとストリーミング テーブルの行フィルターと列マスクは、CREATE OR REFRESH ステートメントを通じて追加、更新、または削除する必要があります。
行フィルターと列マスクを使用してテーブルを定義する方法の詳細な構文については、「 Lakeflow 宣言パイプライン SQL 言語リファレンス 」および 「Lakeflow 宣言型パイプライン Python 言語リファレンス」を参照してください。
行動
Lakeflow 宣言パイプラインで行フィルターまたは列マスクを使用する場合の重要な詳細を次に示します。
-
所有者として更新: パイプラインの更新で具体化されたビューまたはストリーミング テーブルが更新されると、行フィルター関数と列マスク関数はパイプライン所有者の権限で実行されます。 つまり、テーブルの更新では、パイプラインを作成したユーザーのセキュリティ コンテキストが使用されます。 ユーザー コンテキストを確認する関数 (
CURRENT_USERやIS_MEMBERなど) は、パイプライン所有者のユーザー コンテキストを使用して評価されます。 -
クエリ: 具体化されたビューまたはストリーミング テーブルに対してクエリを実行する場合、ユーザー コンテキスト (
CURRENT_USERやIS_MEMBERなど) をチェックする関数は、呼び出し側のユーザー コンテキストを使用して評価されます。 この方法では、現在のユーザーのコンテキストに基づいて、ユーザー固有のデータ セキュリティとアクセス制御が適用されます。 - 行フィルターと列マスクを含むソース テーブルに対する具体化されたビューを作成する場合、具体化されたビューの更新は常に完全更新になります。 完全更新では、最新の定義を使用して、ソースで使用可能なすべてのデータが再処理されます。 このプロセスでは、ソース テーブルのセキュリティ ポリシーが最新のデータと定義に基づいて評価され、適用されていることを確認します。
Observability
DESCRIBE EXTENDED、INFORMATION_SCHEMA、またはカタログ エクスプローラーを使用して、特定の具体化されたビューまたはストリーミング テーブルに適用される既存の行フィルターと列マスクを調べます。 この機能により、ユーザーは具体化されたビューとストリーミング テーブルのデータ アクセスと保護対策を監査および確認できます。