다음을 통해 공유


UDF에서 작업 컨텍스트 가져오기

Batch Unity 카탈로그 Python UDF 또는 PySpark UDF를 실행하는 동안 TaskContext PySpark API를 사용하여 컨텍스트 정보를 가져옵니다.

예를 들어 사용자의 ID 및 클러스터 태그와 같은 컨텍스트 정보는 외부 서비스에 액세스하기 위해 사용자의 ID를 확인할 수 있습니다.

요구 사항

TaskContext를 사용하여 컨텍스트 정보 가져오기

탭을 선택하여 PySpark UDF 또는 Batch Unity 카탈로그 Python UDF에 대한 TaskContext 예제를 확인합니다.

파이스파크 UDF

다음 PySpark UDF 예제에서는 사용자의 컨텍스트를 인쇄합니다.

@udf
def log_context():
  import json
  from pyspark.taskcontext import TaskContext
  tc = TaskContext.get()

  # Returns current user executing the UDF
  session_user = tc.getLocalProperty("user")

  # Returns cluster tags
  tags = dict(item.values() for item in json.loads(tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags  ") or "[]"))

  # Returns current version details
  current_version = {
    "dbr_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion"),
    "dbsql_version": tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
  }

  return {
    "user": session_user,
    "job_group_id": job_group_id,
    "tags": tags,
    "current_version": current_version
  }

Batch Unity 카탈로그 Python UDF

다음 Batch Unity 카탈로그 Python UDF 예제는 서비스 자격 증명을 사용하여 AWS 람다 함수를 호출하는 사용자의 ID를 가져옵니다.

%sql
CREATE OR REPLACE FUNCTION main.test.call_lambda_func(data STRING, debug BOOLEAN) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
  `batch-udf-service-creds-example-cred` DEFAULT
)
AS $$
import boto3
import json
import pandas as pd
import base64
from pyspark.taskcontext import TaskContext


def batchhandler(it):
  # Automatically picks up DEFAULT credential:
  session = boto3.Session()

  client = session.client("lambda", region_name="us-west-2")

  # Can propagate TaskContext information to lambda context:
  user_ctx = {"custom": {"user": TaskContext.get().getLocalProperty("user")}}

  for vals, is_debug in it:
    payload = json.dumps({"values": vals.to_list(), "is_debug": bool(is_debug[0])})

    res = client.invoke(
      FunctionName="HashValuesFunction",
      InvocationType="RequestResponse",
      ClientContext=base64.b64encode(json.dumps(user_ctx).encode("utf-8")).decode(
        "utf-8"
      ),
      Payload=payload,
    )

    response_payload = json.loads(res["Payload"].read().decode("utf-8"))
    if "errorMessage" in response_payload:
      raise Exception(str(response_payload))

    yield pd.Series(response_payload["values"])
$$;

UDF가 등록된 후 호출하십시오.

SELECT main.test.call_lambda_func(data, false)
FROM VALUES
('abc'),
('def')
AS t(data)

TaskContext 속성

메서드 TaskContext.getLocalProperty() 에는 다음과 같은 속성 키가 있습니다.

속성 키 설명 예제 사용
user 현재 UDF를 실행하는 사용자 tc.getLocalProperty("user")
->"alice"
spark.jobGroup.id 현재 UDF와 연결된 Spark 작업 그룹 ID tc.getLocalProperty("spark.jobGroup.id")
->"jobGroup-92318"
spark.databricks.clusterUsageTags.clusterAllTags 클러스터 메타데이터 태그를 JSON 사전 형식의 문자열로 표현된 키-값 쌍으로 구성합니다. tc.getLocalProperty("spark.databricks.clusterUsageTags.clusterAllTags")
->[{"Department": "Finance"}]
spark.databricks.clusterUsageTags.region 작업 영역이 있는 지역 tc.getLocalProperty("spark.databricks.clusterUsageTags.region")
->"us-west-2"
accountId 실행 중인 컨텍스트에 대한 Databricks 계정 ID tc.getLocalProperty("accountId")
->"1234567890123456"
orgId 작업 영역 ID(DBSQL에서는 사용할 수 없음) tc.getLocalProperty("orgId")
->"987654321"
spark.databricks.clusterUsageTags.sparkVersion 클러스터에 대한 Databricks 런타임 버전(비 DBSQL 환경에서) tc.getLocalProperty("spark.databricks.clusterUsageTags.sparkVersion")
->"16.3"
spark.databricks.clusterUsageTags.dbsqlVersion DBSQL 버전 (DBSQL 환경에서) tc.getLocalProperty("spark.databricks.clusterUsageTags.dbsqlVersion")
->"2024.35"