重要
この機能はパブリック プレビュー段階にあります。
Batch Unity Catalog Python UDF は、データのバッチを操作する Python コードを記述できるようにすることで、Unity カタログ UDF の機能を拡張し、行ごとの UDF に関連するオーバーヘッドを減らすことで効率を大幅に向上させます。 これらの最適化により、Unity カタログ バッチ Python UDF は大規模なデータ処理に最適です。
要求事項
Batch Unity カタログ Python UDF には、Databricks Runtime バージョン 16.3 以降が必要です。
Batch Unity カタログ Python UDF を作成する
Batch Unity カタログ Python UDF の作成は、通常の Unity カタログ UDF の作成に似ていますが、次のものが追加されています。
-
PARAMETER STYLE PANDAS:これは、UDF が pandas 反復子を使用してバッチでデータを処理することを指定します。 -
HANDLER 'handler_function': バッチを処理するために呼び出されるハンドラー関数を指定します。
次の例では、Batch Unity カタログ Python UDF を作成する方法を示します。
%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
UDF を登録したら、SQL または Python を使用して呼び出すことができます。
SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);
Batch UDF ハンドラー関数
Batch Unity カタログ Python UDF には、バッチを処理して結果を生成するハンドラー関数が必要です。
HANDLER キーを使用して UDF を作成するときは、ハンドラー関数の名前を指定する必要があります。
ハンドラー関数は、次の処理を行います。
- 1 つ以上の
pandas.Seriesを反復処理する反復子引数を受け取ります。 各pandas.Seriesには、UDF の入力パラメーターが含まれています。 - ジェネレーターを反復処理し、データを処理します。
- ジェネレーター反復子を返します。
Batch Unity カタログ Python UDF は、入力と同じ数の行を返す必要があります。 ハンドラー関数は、各バッチの入力系列と同じ長さの pandas.Series を生成することでこれを保証します。
カスタム依存関係をインストールする
外部ライブラリのカスタム依存関係を定義することで、Databricks ランタイム環境を超えて Batch Unity カタログ Python UDF の機能を拡張できます。
カスタム依存関係 を使用して UDF を拡張するを参照してください。
バッチ UDF は、1 つまたは複数のパラメーターを受け入れる
1 つのパラメーター: ハンドラー関数は、1 つの入力パラメーターを使用すると、バッチごとに pandas.Series を反復処理する反復子を受け取ります。
%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for value_batch in batch_iter:
d = {"min": value_batch.min(), "max": value_batch.max()}
yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;
複数のパラメーター: 複数の入力パラメーターの場合、ハンドラー関数は複数の pandas.Seriesを反復処理する反復子を受け取ります。 系列内の値は、入力パラメーターと同じ順序になります。
%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for p1, p2 in batch_iter: # same order as arguments above
yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);
コストの高い操作を分離してパフォーマンスを最適化する
これらの操作をハンドラー関数から分離することで、計算コストの高い操作を最適化できます。 これにより、データのバッチに対する反復のたびに実行されるのではなく、1 回だけ実行されるようになります。
次の例は、高価な計算が 1 回だけ実行されるようにする方法を示しています。
%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
# expensive computation...
return 1
expensive_value = compute_value()
def handler_func(batch_iter):
for batch in batch_iter:
yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL
分離の境界とセキュリティ
注
共有分離環境では、Databricks Runtime 17.1 以降が必要です。 以前のバージョンでは、すべての Batch Unity カタログ Python UDF が厳密分離モードで実行されます。
同じ所有者を持つ Batch Unity カタログ Python UDF は、既定で分離環境を共有できます。 これにより、起動する必要がある個別の環境の数を減らすことで、パフォーマンスを向上させ、メモリ使用量を減らすことができます。
厳密な分離
UDF が常に独自の完全に分離された環境で実行されるようにするには、 STRICT ISOLATION 特性句を追加します。
ほとんどの UDF では、厳密な分離は必要ありません。 標準のデータ処理 UDF は、既定の共有分離環境の利点を活用し、メモリ消費量が少ないほど高速に実行できます。
次のSTRICT ISOLATION特性句を次の条件を満たす UDF に追加します。
-
eval()、exec()、または同様の関数を使用してコードとして入力を実行する - ローカル ファイル システムにファイルを書き込む
- グローバル変数またはシステム状態を変更する
- 環境変数へのアクセスまたは変更
次の例は、コードとして入力を実行し、厳密な分離を必要とする UDF を示しています。
CREATE OR REPLACE TEMPORARY FUNCTION eval_string(input STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
STRICT ISOLATION
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for code_series in batch_iter:
def eval_func(code):
try:
return str(eval(code))
except Exception as e:
return f"Error: {e}"
yield code_series.apply(eval_func)
$$;
Batch Unity カタログ Python UDF のサービス資格情報
Batch Unity カタログ Python UDF は、Unity カタログ サービス資格情報を使用して外部クラウド サービスにアクセスできます。 これは、セキュリティ トークナイザーなどのクラウド関数をデータ処理ワークフローに統合する場合に特に便利です。
注
サービス資格情報の UDF 固有の API:
UDF では、 databricks.service_credentials.getServiceCredentialsProvider() を使用してサービス資格情報にアクセスします。
これは、UDF 実行コンテキストでは使用できないノートブックで使用される dbutils.credentials.getServiceCredentialsProvider() 関数とは異なります。
サービス資格情報を作成するには、「 サービス資格情報の作成」を参照してください。
UDF 定義の CREDENTIALS 句で使用するサービス資格情報を指定します。
CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
`credential-name` DEFAULT,
`complicated-credential-name` AS short_name,
`simple-cred`,
cred_no_quotes
)
AS $$
# Python code here
$$;
サービス資格情報のアクセス許可
UDF 作成者には、Unity カタログ サービスの資格情報に対する ACCESS アクセス許可が必要です。 ただし、UDF 呼び出し元の場合は、UDF に対するアクセス許可 EXECUTE 付与するだけで十分です。 特に、UDF 呼び出し元は、UDF 作成者の資格情報アクセス許可を使用して UDF を実行するため、基になるサービス資格情報にアクセスする必要はありません。
一時関数の場合、作成者は常に呼び出し側です。 No-PE スコープ (専用クラスターとも呼ばれます) で実行される UDF は、代わりに呼び出し元のアクセス許可を使用します。
既定の資格情報とエイリアス
CREDENTIALS句には複数の資格情報を含めることができますが、DEFAULTとしてマークできるのは 1 つだけです。
AS キーワードを使用して、既定以外の資格情報のエイリアスを設定できます。 各資格情報には一意のエイリアスが必要です。
修正プログラムが適用されたクラウド SDK では、既定の資格情報が自動的に取得されます。 既定の資格情報は、コンピューティングの Spark 構成で指定された既定値よりも優先され、Unity カタログ UDF 定義に保持されます。
azure-identity プロバイダーを使用するには、DefaultAzureCredential パッケージをインストールする必要があります。 外部ライブラリをインストールするには、 ENVIRONMENT 句を使用します。 外部ライブラリのインストールの詳細については、「 カスタム依存関係を使用した UDF の拡張」を参照してください。
サービス資格情報の例 - Azure Functions
次の例では、サービス資格情報を使用して、Batch Unity カタログ Python UDF から Azure 関数を呼び出します。
%sql
CREATE OR REPLACE FUNCTION main.test.call_azure_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
dependencies = '["azure-identity", "requests"]', environment_version = 'None'
)
AS $$
import json
import pandas as pd
import requests
from azure.identity import DefaultAzureCredential
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# DefaultAzureCredential automatically uses the DEFAULT credential
credential = DefaultAzureCredential()
token = credential.get_token("https://management.azure.com/.default")
# Azure Function URL
function_url = "https://your-function-app.azurewebsites.net/api/HashValuesFunction"
# Propagate TaskContext information:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = {
"values": vals.to_list(),
"is_debug": bool(is_debug[0]),
"context": user_ctx
}
headers = {
"Authorization": f"Bearer {token.token}",
"Content-Type": "application/json"
}
response = requests.post(function_url, json=payload, headers=headers)
if response.status_code != 200:
raise Exception(f"Function call failed: {response.text}")
response_data = response.json()
if "errorMessage" in response_data:
raise Exception(str(response_data))
yield pd.Series(response_data["values"])
$$;
登録後に UDF を呼び出します。
SELECT main.test.call_azure_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
タスク実行コンテキストを取得する
TaskContext PySpark API を使用して、ユーザーの ID、クラスター タグ、Spark ジョブ ID などのコンテキスト情報を取得します。 UDF でのタスク コンテキストの取得を参照してください。
関数が一貫した結果を生成する場合に DETERMINISTIC を設定する
同じ入力に対して同じ出力が生成される場合は、関数定義に DETERMINISTIC を追加します。 これにより、クエリの最適化によってパフォーマンスが向上します。
既定では、明示的に宣言されていない限り、Batch Unity カタログ Python UDF は非決定論的であると見なされます。 非決定論的関数の例としては、ランダムな値の生成、現在の時刻または日付へのアクセス、外部 API 呼び出しなどがあります。
CREATE FUNCTION (SQL と Python) を参照してください
制限事項
- Python 関数は
NULL値を個別に処理する必要があり、すべての型マッピングは Azure Databricks SQL 言語マッピングに従う必要があります。 - Batch Unity カタログ Python UDF は、セキュリティで保護された分離された環境で実行され、共有ファイル システムや内部サービスにはアクセスできません。
- ステージ内の複数の UDF 呼び出しがシリアル化され、中間結果が具体化され、ディスクに流出する可能性があります。
- サービス資格情報は、Batch Unity カタログ Python UDF とスカラー Python UDF でのみ使用できます。 これらは、標準の Unity カタログ Python UDF ではサポートされていません。
- 専用クラスターおよび一時関数の場合、関数の呼び出し元には、サービス資格情報に対する
ACCESSアクセス許可が必要です。 「サービス資格情報を使用して外部クラウド サービスにアクセスするためのアクセス許可を付与する」を参照してください。 - パブリック プレビュー機能を有効にします。ワークスペースのプレビュー ページでサーバーレス SQL Warehouse の UDF のネットワークを有効にして、サーバーレス SQL ウェアハウス コンピューティング上の外部サービスに対して Batch Unity カタログ Python UDF 呼び出しを行います。
- サーバーレス ノートブックまたはジョブ コンピューティングで Batch Unity カタログ Python UDF 呼び出しを行うには、サーバーレス エグレス制御を構成する必要があります