중요하다
Lakeflow 선언적 파이프라인 sink
API는 공개 미리 보기로 제공됩니다.
이 문서에서는 Lakeflow 선언적 파이프라인 sink
API와 플로우 를 사용하여 파이프라인에 의해 변환된 레코드를 외부 데이터 싱크에 쓰는 방법을 설명합니다. 외부 데이터 싱크에는 Unity 카탈로그 관리 테이블과 외부 테이블, Apache Kafka 또는 Azure Event Hubs와 같은 이벤트 스트리밍 서비스가 포함됩니다.
메모
Lakeflow 선언적 파이프라인 sink
API는 Python에서만 사용할 수 있습니다.
Lakeflow 선언적 파이프라인 싱크란?
Lakeflow 선언적 파이프라인 싱크는 Lakeflow 선언적 파이프라인 흐름의 대상입니다. 기본적으로 Lakeflow 선언적 파이프라인 흐름은 스트리밍 테이블 또는 구체화된 뷰 대상으로 데이터를 내보냅니다. 둘 다 Azure Databricks 관리 델타 테이블입니다. Lakeflow 선언적 파이프라인 싱크는 Apache Kafka 또는 Azure Event Hubs와 같은 이벤트 스트리밍 서비스 및 Unity 카탈로그에서 관리하는 외부 테이블과 같은 대상에 변환된 데이터를 쓰는 데 사용하는 대체 대상입니다. 싱크를 사용하면 이제 Lakeflow 선언적 파이프라인의 출력을 유지하기 위한 더 많은 옵션이 제공됩니다.
Lakeflow 선언적 파이프라인 싱크는 언제 사용해야 하나요?
Databricks는 필요하다면 Lakeflow 명시적 파이프라인 싱크를 사용할 것을 권장합니다.
- 사기 감지, 실시간 분석 및 고객 권장 사항과 같은 운영 사용 사례를 구축합니다. 운영 사용 사례는 일반적으로 Apache Kafka 토픽과 같은 메시지 버스에서 데이터를 읽은 다음 대기 시간이 짧은 데이터를 처리하고 처리된 레코드를 메시지 버스에 다시 씁니다. 이 방법을 사용하면 클라우드 스토리지에서 작성하거나 읽지 않음으로써 대기 시간을 줄일 수 있습니다.
- Lakeflow 선언적 파이프라인의 흐름에서 변환된 데이터를 Unity 카탈로그가 관리하는 테이블과 외부 테이블을 포함하여 외부 델타 인스턴스가 관리하는 테이블에 씁니다.
- Apache Kafka 토픽과 같이 Databricks 외부의 싱크로 역방향 ETL(추출-변환-로드)을 수행합니다. 이 방법을 사용하면 Unity 카탈로그 테이블 또는 기타 Databricks 관리 스토리지 외부에서 데이터를 읽거나 사용해야 하는 사용 사례를 효과적으로 지원할 수 있습니다.
- Azure Databricks에서 직접 지원되지 않는 데이터 형식으로 작성해야 합니다. Python 사용자 지정 데이터 원본을 사용하면 사용자 지정 Python 코드를 사용하여 모든 데이터 원본에 쓰는 싱크를 만들 수 있습니다. PySpark 사용자 지정 데이터 원본을 참조하세요.
Lakeflow 선언적 파이프라인 싱크를 사용하려면 어떻게 해야 하나요?
이벤트 데이터가 스트리밍 원본에서 Lakeflow 선언적 파이프라인으로 수집되므로 Lakeflow 선언적 파이프라인 기능을 사용하여 이 데이터를 처리 및 구체화한 다음, 추가 흐름 처리를 사용하여 변환된 데이터 레코드를 Lakeflow 선언적 파이프라인 싱크로 스트리밍합니다.
create_sink()
함수를 사용하여 이 싱크를 만듭니다. 함수에 create_sink
대한 자세한 내용은 싱크 API 참조를 참조하세요.
스트리밍 이벤트 데이터를 만들거나 처리하고 쓰기 위해 데이터 레코드를 준비하는 파이프라인이 있는 경우 Lakeflow 선언적 파이프라인 싱크를 사용할 준비가 된 것입니다.
Lakeflow 선언적 파이프라인 싱크 구현은 다음 두 단계로 구성됩니다.
- Lakeflow 선언적 파이프라인의 데이터 수신점을 생성합니다.
- 추가 흐름 사용하여 준비된 레코드를 싱크로 기록합니다.
Lakeflow 선언적 파이프라인 싱크 생성하기
Databricks는 스트림 데이터에서 처리된 레코드를 기록할 수 있는 다양한 유형의 기록 저장소를 지원합니다.
- 델타 테이블 싱크(유니티 카탈로그 관리 테이블 및 외부 테이블 포함)
- Apache Kafka 데이터 수신점
- Azure Event Hubs 데이터 수신 지점
- Python 사용자 지정 데이터 원본을 사용하여 Python으로 작성된 사용자 지정 싱크
다음은 Delta, Kafka 및 Azure Event Hubs 싱크 및 Python 사용자 지정 데이터 원본에 대한 구성의 예입니다.
델타 싱크
파일 경로별로 델타 싱크를 만들려면 다음을 수행합니다.
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
정규화된 카탈로그 및 스키마 경로를 사용하여 테이블 이름으로 델타 싱크를 만들려면 다음을 수행합니다.
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
Kafka 및 Azure Event Hubs 수신기
이 코드는 Apache Kafka 및 Azure Event Hubs 싱크 모두에서 작동합니다.
credential_name = "<service-credential>"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
topic_name = "dlt-sink"
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"databricks.serviceCredential": credential_name,
"kafka.bootstrap.servers": bootstrap_servers,
"topic": topic_name
}
)
Unity credential_name
카탈로그 서비스 자격 증명에 대한 참조입니다. 자세한 내용은 Unity 카탈로그 서비스 자격 증명을 사용하여 외부 클라우드 서비스에 연결하는 방법을 참조하세요.
Python 사용자 지정 데이터 원본
Python 사용자 지정 데이터 원본이 등록되어 my_custom_datasource
있다고 가정하면 다음 코드가 해당 데이터 원본에 쓸 수 있습니다.
from dlt import *
# Assume `my_custom_datasource` is a custom Python streaming
# data source that writes data to your system.
# Create DLT sink using my_custom_datasource
create_sink(
name="custom_sink",
format="my_custom_datasource",
options={
<options-needed-for-custom-datasource>
}
)
# Create append flow to send data to RequestBin
@append_flow(name="flow_to_custom_sink", target="custom_sink")
def flow_to_custom_sink():
return read_stream("my_source_data")
create_sink
함수 사용에 대한 자세한 내용은 싱크 API 참조참조하세요.
싱크를 만든 후에는 처리된 레코드를 싱크로 스트리밍할 수 있습니다.
추가 흐름을 사용하여 Lakeflow 선언적 파이프라인 싱크에 쓰기
싱크를 만든 다음 단계는 추가 흐름에서 레코드 출력의 대상으로 지정하여 처리된 레코드를 해당 레코드에 쓰는 것입니다.
target
데코레이터에서 싱크를 append_flow
값으로 지정하여 이 작업을 수행합니다.
- Unity 카탈로그 관리 테이블 및 외부 테이블의 경우
delta
형식을 사용하고 옵션에서 경로 또는 테이블 이름을 지정합니다. Lakeflow 선언적 파이프라인은 Unity 카탈로그를 사용하도록 구성해야 합니다. - Apache Kafka 토픽의 경우
kafka
형식을 사용하고 옵션에서 토픽 이름, 연결 정보 및 인증 정보를 지정합니다. 이러한 옵션은 Spark 구조적 스트리밍 Kafka 싱크에서 지원하는 것과 동일한 옵션입니다. Kafka 구조적 스트리밍 기록기의 설정에 대해 을 참조하십시오. - Azure Event Hubs의 경우
kafka
형식을 사용하고 옵션에서 Event Hubs 이름, 연결 정보 및 인증 정보를 지정합니다. 이러한 옵션은 Kafka 인터페이스를 사용하는 Spark 구조적 스트리밍 Event Hubs 싱크에서 지원되는 동일한 옵션입니다. Microsoft Entra ID와 Azure Event Hubs의 서비스 주체 인증을참조하세요.
다음은 Lakeflow 선언적 파이프라인에서 처리된 레코드를 사용하여 Delta, Kafka 및 Azure Event Hubs 싱크에 쓸 흐름을 설정하는 방법의 예입니다.
델타 싱크
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Kafka 및 Azure Event Hubs 수신기
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
value
매개 변수는 Azure Event Hubs 싱크에 필수입니다.
key
, partition
, headers
및 topic
같은 추가 매개 변수는 선택 사항입니다.
데코레이터에 대한 append_flow
자세한 내용은 여러 흐름을 사용하여 단일 대상에 쓰는 방법을 참조하세요.
제한 사항
Python API만 지원됩니다. SQL은 지원되지 않습니다.
스트리밍 쿼리만 지원됩니다. 일괄 처리 쿼리는 지원되지 않습니다.
배수구에 쓰는 데는
append_flow
만 사용할 수 있습니다. 같은create_auto_cdc_flow
다른 흐름은 지원되지 않으며 Lakeflow 선언적 파이프라인 데이터 세트 정의에서 싱크를 사용할 수 없습니다. 예를 들어 다음이 지원되지 않습니다.@table("from_sink_table") def fromSink(): return read_stream("my_sink")
델타 싱크의 경우 테이블 이름은 완전히 자격이 부여되어야 합니다. 특히 Unity 카탈로그 관리형 외부 테이블의 경우 테이블 이름은
<catalog>.<schema>.<table>
형식이어야 합니다. Hive 메타스토어의 경우<schema>.<table>
형식이어야 합니다.전체 새로 고침 업데이트 실행해도 싱크에서 이전에 계산된 결과 데이터가 정리되지 않습니다. 즉, 다시 처리된 모든 데이터가 싱크에 추가되고 기존 데이터는 변경되지 않습니다.
Lakeflow 선언적 파이프라인 기대치는 지원되지 않습니다.