Important
Unity カタログへの Python UDF の登録は 、パブリック プレビュー段階です。
Unity Catalog ユーザー定義テーブル関数 (UDTF) を使用すると、スカラー値の代わりに完全なテーブルを返す関数を登録できます。 各呼び出しから 1 つの結果値を返すスカラー関数とは異なり、UDF は SQL ステートメントの FROM 句で呼び出され、複数の行と列を返すことができます。
UDF は、次の場合に特に役立ちます。
- 配列または複雑なデータ構造を複数の行に変換する
- 外部 API またはサービスを SQL ワークフローに統合する
- カスタム データ生成またはエンリッチメント ロジックの実装
- 複数行にわたるステートフル操作を必要とするデータの処理
各 UDTF 呼び出しでは、0 個以上の引数を受け取ることができます。 これらの引数には、入力テーブル全体を表すスカラー式またはテーブル引数を指定できます。
UDF は、次の 2 つの方法で登録できます。
- Unity カタログ: UDTF を Unity カタログの管理オブジェクトとして登録します。
- セッション スコープ: 現在のノートブックまたはジョブに分離されたローカル
SparkSessionに登録します。 Python ユーザー定義テーブル関数 (UDF) を参照してください。
Requirements
Unity Catalog Python UDF は、次のコンピューティングの種類でサポートされています。
- 標準アクセス モードのクラシック コンピューティング (Databricks Runtime 17.1 以降)
- SQL ウェアハウス (サーバーレスまたはプロ)
Unity カタログで UDTF を作成する
SQL DDL を使用して、Unity カタログに管理された UDTF を作成します。 UDF は、SQL ステートメントの FROM 句を使用して呼び出されます。
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;
SELECT * FROM square_numbers(1, 5);
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+
Databricks は Python UDF を Python クラスとして実装し、出力行を生成する必須の eval メソッドを使用します。
環境の分離
注
共有分離環境では、Databricks Runtime 17.2 以降が必要です。 以前のバージョンでは、すべての Unity カタログ Python UDF が厳密分離モードで実行されます。
同じ所有者とセッションを持つ Unity Catalog Python UDF は、既定で分離環境を共有できます。 これにより、起動する必要がある個別の環境の数を減らすことで、パフォーマンスが向上し、メモリ使用量が削減されます。
厳密な分離
UDTF が常に独自の完全に分離された環境で実行されるようにするには、 STRICT ISOLATION 特性句を追加します。
ほとんどの UDF では、厳密な分離は必要ありません。 Standard データ処理 UDF は、既定の共有分離環境の利点を活用し、メモリ消費量を減らしながら高速に実行できます。
STRICT ISOLATION特性句を UDF に追加します。
-
eval()、exec()、または同様の関数を使用して、コードとして入力を実行します。 - ローカル ファイル システムにファイルを書き込みます。
- グローバル変数またはシステム状態を変更します。
- 環境変数にアクセスまたは変更します。
次の UDTF の例では、カスタム環境変数を設定し、変数を読み取り戻し、変数を使用して数値のセットを乗算します。 UDTF はプロセス環境を変更するため、 STRICT ISOLATIONで実行します。 そうしないと、同じ環境内の他の UDF/UDF の環境変数がリークまたはオーバーライドされ、不適切な動作が発生する可能性があります。
CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os
class Multiplier:
def eval(self, factor: str):
# Save the factor as an environment variable
os.environ["FACTOR"] = factor
# Read it back and convert it to a number
scale = int(os.getenv("FACTOR", "1"))
# Multiply 0 through 4 by the factor
for i in range(5):
yield (i, i * scale)
$$;
SELECT * FROM multiply_numbers("3");
実際の例
次の例は、単純なデータ変換から複雑な外部統合に進む、Unity カタログ Python UDF の実際のユース ケースを示しています。
例: 再実装 explode
Spark には組み込みの explode 関数が用意されていますが、独自のバージョンを作成すると、1 つの入力を取得し、複数の出力行を生成する基本的な UDTF パターンが示されます。
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;
SQL クエリで関数を直接使用します。
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
|| element |
+---------+
|| apple |
|| banana |
|| cherry |
+---------+
または、 LATERAL 結合を使用して既存のテーブル データに適用します。
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;
例: REST API を使用した IP アドレスの位置情報
この例では、UDF が外部 API を SQL ワークフローに直接統合する方法を示します。 アナリストは、個別の ETL プロセスを必要とするのではなく、使い慣れた SQL 構文を使用してリアルタイムの API 呼び出しでデータを強化できます。
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Return nothing if the API request fails
return
$$;
注
Python UDF では、標準アクセス モードで構成されたサーバーレス コンピューティングまたはコンピューティングを使用する場合、ポート 80、443、53 経由の TCP/UDP ネットワーク トラフィックが許可されます。
この関数を使用して、地理情報を使用して Web ログ データを強化します。
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;
この方法では、事前に処理された参照テーブルや個別のデータ パイプラインを必要とせずに、リアルタイムの地理分析が可能になります。 UDTF は HTTP 要求、JSON 解析、エラー処理を処理し、標準の SQL クエリを介して外部データ ソースにアクセスできるようにします。
関数が一貫した結果を生成する場合に DETERMINISTIC を設定する
同じ入力に対して同じ出力が生成される場合は、関数定義に DETERMINISTIC を追加します。 これにより、クエリの最適化によってパフォーマンスが向上します。
既定では、明示的に宣言されていない限り、Batch Unity カタログ Python UDF は非決定論的であると見なされます。 非決定論的関数の例としては、ランダムな値の生成、現在の時刻または日付へのアクセス、外部 API 呼び出しなどがあります。
CREATE FUNCTION (SQL と Python) を参照してください
制限事項
Unity カタログ Python UDF には、次の制限事項が適用されます。
- ポリモーフィック なテーブル関数はサポートされていません。
- TABLE引数はサポートされていません。
- Unity カタログ サービスの資格情報 はサポートされていません。
- カスタム依存関係 はサポートされていません。