Batch Unity 카탈로그 Python UDF 또는 PySpark UDF를 실행하는 동안 TaskContext PySpark API를 사용하여 컨텍스트 정보를 가져옵니다.
예를 들어 사용자의 ID 및 클러스터 태그와 같은 컨텍스트 정보는 외부 서비스에 액세스하기 위해 사용자의 ID를 확인할 수 있습니다.
요구 사항
TaskContext는 Databricks 런타임 버전 16.3 이상에서 지원됩니다.
TaskContext는 다음 UDF 유형에서 지원됩니다.
TaskContext를 사용하여 컨텍스트 정보 가져오기
탭을 선택하여 PySpark UDF 또는 Batch Unity 카탈로그 Python UDF에 대한 TaskContext 예제를 확인합니다.
파이스파크 UDF
다음 PySpark UDF 예제에서는 사용자의 컨텍스트를 인쇄합니다.
@udf
def log_context():
import json
from pyspark.taskcontext import TaskContext
tc = TaskContext.get()
# Returns current user executing the UDF
session_user = tc.getLocalProperty("user")
# Returns cluster tags
tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags ") or "[]"))
# Returns current version details
current_version = {
"dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
"dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
}
return {
"user": session_user,
"job_group_id": job_group_id,
"tags": tags,
"current_version": current_version
}
Batch Unity 카탈로그 Python UDF
다음 Batch Unity 카탈로그 Python UDF 예제는 서비스 자격 증명을 사용하여 AWS 람다 함수를 호출하는 사용자의 ID를 가져옵니다.
%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext
def batchhandler(it):
# Automatically picks up DEFAULT credential:
session = boto3.Session()
client = session.client("lambda", region_name="us-west-2")
# Can propagate TaskContext information to lambda context:
user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}
for vals, is_debug in it:
payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})
res = client.invoke(
FunctionName="HashValuesFunction",
InvocationType="RequestResponse",
ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
"utf-8"
),
Payload=payload,
)
response_payload = json.loads(res["Payload"].read().decode("utf-8"))
if "errorMessage" in response_payload:
raise Exception(str(response_payload))
yield pd.Series(response_payload["values"])
$$;
UDF가 등록된 후 호출하십시오.
SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)
TaskContext 속성
메서드 TaskContext.getLocalProperty()
에는 다음과 같은 속성 키가 있습니다.
속성 키 | 설명 | 예제 사용 |
---|---|---|
user |
현재 UDF를 실행하는 사용자 | tc.getLocalProperty("user") -> "alice" |
spark.jobGroup.id |
현재 UDF와 연결된 Spark 작업 그룹 ID | tc.getLocalProperty("spark.jobGroup.id") -> "jobGroup-92318" |
spark.databricks.clusterUsageTags.clusterAllTags |
클러스터 메타데이터 태그를 JSON 사전 형식의 문자열로 표현된 키-값 쌍으로 구성합니다. | tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags") -> [{"Department": "Finance"}] |
spark.databricks.clusterUsageTags.region |
작업 영역이 있는 지역 | tc.getLocalProperty("spark.databricks.clusterUsageTags.region") -> "us-west-2" |
accountId |
실행 중인 컨텍스트에 대한 Databricks 계정 ID | tc.getLocalProperty("accountId") -> "1234567890123456" |
orgId |
작업 영역 ID(DBSQL에서는 사용할 수 없음) | tc.getLocalProperty("orgId") -> "987654321" |
spark.databricks.clusterUsageTags.sparkVersion |
클러스터에 대한 Databricks 런타임 버전(비 DBSQL 환경에서) | tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion") -> "16.3" |
spark.databricks.clusterUsageTags.dbsqlVersion |
DBSQL 버전 (DBSQL 환경에서) | tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion") -> "2024.35" |