次の方法で共有


Unity カタログの Python ユーザー定義テーブル関数 (UDF)

Important

Unity カタログへの Python UDF の登録は 、パブリック プレビュー段階です

Unity Catalog ユーザー定義テーブル関数 (UDTF) を使用すると、スカラー値の代わりに完全なテーブルを返す関数を登録できます。 各呼び出しから 1 つの結果値を返すスカラー関数とは異なり、UDF は SQL ステートメントの FROM 句で呼び出され、複数の行と列を返すことができます。

UDF は、次の場合に特に役立ちます。

  • 配列または複雑なデータ構造を複数の行に変換する
  • 外部 API またはサービスを SQL ワークフローに統合する
  • カスタム データ生成またはエンリッチメント ロジックの実装
  • 複数行にわたるステートフル操作を必要とするデータの処理

各 UDTF 呼び出しでは、0 個以上の引数を受け取ることができます。 これらの引数には、入力テーブル全体を表すスカラー式またはテーブル引数を指定できます。

UDF は、次の 2 つの方法で登録できます。

  • Unity カタログ: UDTF を Unity カタログの管理オブジェクトとして登録します。
  • セッション スコープ: 現在のノートブックまたはジョブに分離されたローカル SparkSessionに登録します。 Python ユーザー定義テーブル関数 (UDF) を参照してください。

Requirements

Unity Catalog Python UDF は、次のコンピューティングの種類でサポートされています。

  • 標準アクセス モードのクラシック コンピューティング (Databricks Runtime 17.1 以降)
  • SQL ウェアハウス (サーバーレスまたはプロ)

Unity カタログで UDTF を作成する

SQL DDL を使用して、Unity カタログに管理された UDTF を作成します。 UDF は、SQL ステートメントの FROM 句を使用して呼び出されます。

CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
    """
    Basic UDTF that computes a sequence of integers
    and includes the square of each number in the range.
    """
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)
$$;

SELECT * FROM square_numbers(1, 5);

+-----+---------+
| num | squared |
+-----+---------+
| 1   | 1       |
| 2   | 4       |
| 3   | 9       |
| 4   | 16      |
| 5   | 25      |
+-----+---------+

Databricks は Python UDF を Python クラスとして実装し、出力行を生成する必須の eval メソッドを使用します。

環境の分離

共有分離環境では、Databricks Runtime 17.2 以降が必要です。 以前のバージョンでは、すべての Unity カタログ Python UDF が厳密分離モードで実行されます。

同じ所有者とセッションを持つ Unity Catalog Python UDF は、既定で分離環境を共有できます。 これにより、起動する必要がある個別の環境の数を減らすことで、パフォーマンスが向上し、メモリ使用量が削減されます。

厳密な分離

UDTF が常に独自の完全に分離された環境で実行されるようにするには、 STRICT ISOLATION 特性句を追加します。

ほとんどの UDF では、厳密な分離は必要ありません。 Standard データ処理 UDF は、既定の共有分離環境の利点を活用し、メモリ消費量を減らしながら高速に実行できます。

STRICT ISOLATION特性句を UDF に追加します。

  • eval()exec()、または同様の関数を使用して、コードとして入力を実行します。
  • ローカル ファイル システムにファイルを書き込みます。
  • グローバル変数またはシステム状態を変更します。
  • 環境変数にアクセスまたは変更します。

次の UDTF の例では、カスタム環境変数を設定し、変数を読み取り戻し、変数を使用して数値のセットを乗算します。 UDTF はプロセス環境を変更するため、 STRICT ISOLATIONで実行します。 そうしないと、同じ環境内の他の UDF/UDF の環境変数がリークまたはオーバーライドされ、不適切な動作が発生する可能性があります。

CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os

class Multiplier:
    def eval(self, factor: str):
        # Save the factor as an environment variable
        os.environ["FACTOR"] = factor

        # Read it back and convert it to a number
        scale = int(os.getenv("FACTOR", "1"))

        # Multiply 0 through 4 by the factor
        for i in range(5):
            yield (i, i * scale)
$$;

SELECT * FROM multiply_numbers("3");

実際の例

次の例は、単純なデータ変換から複雑な外部統合に進む、Unity カタログ Python UDF の実際のユース ケースを示しています。

例: 再実装 explode

Spark には組み込みの explode 関数が用意されていますが、独自のバージョンを作成すると、1 つの入力を取得し、複数の出力行を生成する基本的な UDTF パターンが示されます。

CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
    def eval(self, arr):
        if arr is None:
            return
        for element in arr:
            yield (element,)
$$;

SQL クエリで関数を直接使用します。

SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
|| element |
+---------+
|| apple   |
|| banana  |
|| cherry  |
+---------+

または、 LATERAL 結合を使用して既存のテーブル データに適用します。

SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;

例: REST API を使用した IP アドレスの位置情報

この例では、UDF が外部 API を SQL ワークフローに直接統合する方法を示します。 アナリストは、個別の ETL プロセスを必要とするのではなく、使い慣れた SQL 構文を使用してリアルタイムの API 呼び出しでデータを強化できます。

CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
    def eval(self, ip_address):
        import requests
        api_url = f"https://api.ip-lookup.example.com/{ip_address}"
        try:
            response = requests.get(api_url)
            response.raise_for_status()
            data = response.json()
            yield (data.get('city'), data.get('country'))
        except requests.exceptions.RequestException as e:
            # Return nothing if the API request fails
            return
$$;

Python UDF では、標準アクセス モードで構成されたサーバーレス コンピューティングまたはコンピューティングを使用する場合、ポート 80、443、53 経由の TCP/UDP ネットワーク トラフィックが許可されます。

この関数を使用して、地理情報を使用して Web ログ データを強化します。

SELECT
  l.timestamp,
  l.request_path,
  geo.city,
  geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;

この方法では、事前に処理された参照テーブルや個別のデータ パイプラインを必要とせずに、リアルタイムの地理分析が可能になります。 UDTF は HTTP 要求、JSON 解析、エラー処理を処理し、標準の SQL クエリを介して外部データ ソースにアクセスできるようにします。

関数が一貫した結果を生成する場合に DETERMINISTIC を設定する

同じ入力に対して同じ出力が生成される場合は、関数定義に DETERMINISTIC を追加します。 これにより、クエリの最適化によってパフォーマンスが向上します。

既定では、明示的に宣言されていない限り、Batch Unity カタログ Python UDF は非決定論的であると見なされます。 非決定論的関数の例としては、ランダムな値の生成、現在の時刻または日付へのアクセス、外部 API 呼び出しなどがあります。

CREATE FUNCTION (SQL と Python) を参照してください

制限事項

Unity カタログ Python UDF には、次の制限事項が適用されます。

次のステップ