이 문서에는 Python UDF(사용자 정의 함수) 예제가 포함되어 있습니다. UDF를 등록하는 방법, UDF를 호출하는 방법을 보여 주고 Spark SQL에서 하위 식의 평가 순서에 대한 주의 사항을 제공합니다.
Databricks Runtime 14.0 이상에서는 Python UDF(사용자 정의 테이블 함수)를 사용하여 스칼라 값 대신 전체 관계를 반환하는 함수를 등록할 수 있습니다. Python UDF(사용자 정의 테이블 함수)참조하세요.
참고
Databricks Runtime 12.2 LTS 이하에서는 표준 액세스 모드를 사용하는 Unity 카탈로그 컴퓨팅에서 Python UDF 및 Pandas UDF가 지원되지 않습니다. 스칼라 Python UDF 및 Pandas UDF는 모든 액세스 모드에 대해 Databricks Runtime 13.3 LTS 이상에서 지원됩니다.
Databricks Runtime 13.3 LTS 이상에서는 SQL 구문을 사용하여 Unity 카탈로그에 스칼라 Python UDF를 등록할 수 있습니다. Unity 카탈로그UDF(사용자 정의 함수)
UDF로 함수 등록
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
필요에 따라 UDF의 반환 형식을 설정할 수 있습니다. 기본 반환 형식은 StringType
입니다.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Spark SQL에서 UDF 호출
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
DataFrames에서 UDF 사용
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
또는 주석 구문을 사용하여 동일한 UDF를 선언할 수 있습니다.
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
평가 순서 및 Null 검사
Spark SQL(SQL과 DataFrame 및 데이터 세트 API 포함)은 하위 식의 평가 순서를 보장하지 않습니다. 특히 연산자 또는 함수 입력이 반드시 왼쪽에서 오른쪽 또는 다른 고정 순서로 평가되는 것은 아닙니다. 예를 들어, 논리적 AND
및 OR
표현식에는 왼쪽에서 오른쪽으로의 "쇼트 서킷" 의미 체계가 없습니다.
따라서 쿼리 최적화 및 계획 중에 식과 절의 순서가 변경될 수 있으므로 부울 식의 부작용 또는 평가 순서와 WHERE
및 HAVING
절의 순서에 의존하는 것은 위험합니다. 특히 UDF가 Null 검사를 위해 SQL의 단락 의미 체계를 사용하는 경우 UDF를 호출하기 전에 null 검사가 수행된다는 보장은 없습니다. 예를 들면 다음과 같습니다.
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
여기서 WHERE
절은 Null을 필터링한 후 strlen
UDF가 호출되도록 보장하지 않습니다.
적절한 Null 검사를 수행하려면 다음 중 하나를 수행하는 것이 좋습니다.
- UDF 자체가 Null을 인식하도록 설정하고 UDF 내에서 Null 검사를 수행합니다.
-
IF
또는CASE WHEN
식을 사용하여 Null 검사를 수행하고 조건부 분기에서 UDF를 호출합니다.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Scalar Python UDF의 서비스 자격 증명
스칼라 Python UDF는 Unity 카탈로그 서비스 자격 증명을 사용하여 외부 클라우드 서비스에 안전하게 액세스할 수 있습니다. 클라우드 기반 토큰화, 암호화 또는 비밀 관리와 같은 작업을 데이터 변환에 직접 통합하는 데 유용합니다.
스칼라 Python UDF에 대한 서비스 자격 증명은 SQL 웨어하우스 및 일반 컴퓨팅에서만 지원됩니다.
서비스 자격 증명을 만들려면 서비스 자격 증명 만들기를 참조하세요.
서비스 자격 증명에 액세스하려면 UDF 논리의 databricks.service_credentials.getServiceCredentialsProvider()
유틸리티를 사용하여 적절한 자격 증명으로 클라우드 SDK를 초기화합니다. 모든 코드는 UDF 본문에 캡슐화되어야 합니다.
@udf
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
서비스 자격 증명 권한
UDF 작성자는 Unity 카탈로그 서비스 자격 증명에 대한 ACCESS 권한이 있어야 합니다.
전용 클러스터라고도 하는 No-PE 범위에서 실행되는 UDF에는 서비스 자격 증명에 대한 MANAGE 권한이 필요합니다.
기본 자격 증명
스칼라 Python UDF에서 사용되는 경우 Databricks는 컴퓨팅 환경 변수의 기본 서비스 자격 증명을 자동으로 사용합니다. 이 동작을 사용하면 UDF 코드에서 자격 증명 별칭을 명시적으로 관리하지 않고도 외부 서비스를 안전하게 참조할 수 있습니다. 컴퓨팅 리소스에 대한 기본 서비스 자격 증명 지정을 참조하세요.
기본 자격 증명 지원은 표준 및 전용 액세스 모드 클러스터에서만 사용할 수 있습니다. DBSQL에서는 사용할 수 없습니다.
azure-identity
패키지를 설치하여 DefaultAzureCredential
제공자를 사용하십시오. 패키지를 설치하려면 Notebook 범위 내 Python 라이브러리 또는 컴퓨터 범위 내 라이브러리를 참조하세요.
@udf
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
작업 실행 컨텍스트 가져오기
TaskContext PySpark API를 사용하여 사용자 정보, 클러스터 태그, 스파크 작업 ID 등과 같은 컨텍스트 정보를 얻으세요. UDF에서 작업 컨텍스트 가져오기를 참조하십시오.
제한 사항
PySpark UDF에는 다음과 같은 제한 사항이 적용됩니다.
파일 액세스 제한 사항: Databricks Runtime 14.2 이하에서 공유 클러스터의 PySpark UDF는 Git 폴더, 작업 영역 파일 또는 Unity 카탈로그 볼륨에 액세스할 수 없습니다.
브로드캐스트 변수: 표준 액세스 모드 클러스터 및 서버리스 컴퓨팅의 PySpark UDF는 브로드캐스트 변수를 지원하지 않습니다.
서비스 자격 증명: 서비스 자격 증명은 Batch Unity 카탈로그 Python UDF 및 스칼라 Python UDF에서만 사용할 수 있습니다. 표준 Unity 카탈로그 Python UDF에서는 지원되지 않습니다.
서비스 자격 증명: 서버리스 또는 전용 컴퓨팅에서는 서비스 자격 증명이 지원되지 않습니다.
- 서버리스의 메모리 제한: 서버리스 컴퓨팅의 PySpark UDF의 메모리 제한은 PySpark UDF당 1GB입니다. 이 제한을 초과하면 UDF_PYSPARK_USER_CODE_ERROR.MEMORY_LIMIT_SERVERLESS 형식의 오류가 발생합니다.
- 표준 액세스 모드의 메모리 제한: 표준 액세스 모드의 PySpark UDF에는 선택한 인스턴스 유형의 사용 가능한 메모리에 따라 메모리 제한이 있습니다. 사용 가능한 메모리를 초과하면 형식 UDF_PYSPARK_USER_CODE_ERROR 오류가 발생합니다 . MEMORY_LIMIT.