Important
이 기능은 공개 미리 보기로 제공됩니다.
이 함수는 create_auto_cdc_from_snapshot_flow
Lakeflow 선언적 파이프라인 CDC(변경 데이터 캡처) 기능을 사용하여 데이터베이스 스냅샷에서 원본 데이터를 처리하는 흐름을 만듭니다.
AUTO CDC FROM SNAPSHOT
API를 사용하여 CDC를 구현하는 방법은 참조하세요..
Note
이 함수는 이전 함수 apply_changes_from_snapshot()
를 대체합니다. 두 함수의 서명은 동일합니다. Databricks는 새 이름을 사용하도록 업데이트하는 것이 좋습니다.
Syntax
import dlt
dlt.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Note
AUTO CDC FROM SNAPSHOT
처리의 경우 동일한 키를 가진 일치하는 레코드가 대상에 없는 경우 새 행을 삽입하는 것이 기본 동작입니다. 일치하는 레코드가 있는 경우 행의 값이 변경된 경우에만 업데이트됩니다. 대상에 키가 있지만 더 이상 원본에 없는 행이 삭제됩니다.
스냅샷을 사용한 CDC 처리에 대한 자세한 내용은 AUTO CDC API: Lakeflow 선언적 파이프라인을 사용하여 변경 데이터 캡처 간소화를 참조하세요.
create_auto_cdc_from_snapshot_flow()
함수를 사용하는 예제는 주기적인 스냅샷 수집 및 기록 스냅샷 수집 예제를 참조하세요.
Parameters
Parameter | Type | Description |
---|---|---|
target |
str |
Required. 업데이트할 테이블의 이름입니다.
create_streaming_table() 함수를 사용하여 create_auto_cdc_from_snapshot_flow() 함수를 실행하기 전에 대상 테이블을 만들 수 있습니다. |
source |
str 또는 lambda function |
Required. 주기적으로 스냅샷할 테이블 또는 뷰의 이름 또는 처리할 스냅샷 DataFrame 및 스냅샷 버전을 반환하는 Python 람다 함수입니다.
source 인수구현을 참조하세요. |
keys |
list |
Required. 원본 데이터의 행을 고유하게 식별하는 열 또는 열의 조합입니다. 이는 대상 테이블의 특정 레코드에 적용되는 CDC 이벤트를 식별하는 데 사용됩니다. 다음 중 하나를 지정할 수 있습니다.
|
stored_as_scd_type |
str 또는 int |
레코드를 SCD 형식 1 또는 SCD 형식 2로 저장할지 여부입니다. SCD 유형 1의 경우 1 , SCD 유형 2의 경우 2 로 설정합니다. 기본값은 SCD 형식 1입니다. |
track_history_column_list 또는 track_history_except_column_list |
list |
대상 테이블의 기록에 대해 추적할 출력 열의 하위 집합입니다.
track_history_column_list 를 사용하여 추적할 열의 전체 목록을 지정합니다. 추적에서 제외할 열을 지정하는 데 사용합니다 track_history_except_column_list . 값을 문자열 목록 또는 Spark SQL col() 함수로 선언할 수 있습니다.
col() 함수에 대한 인수에는 한정자를 포함할 수 없습니다. 예를 들어, col(userId) 는 사용할 수 있지만 col(source.userId) 는 사용할 수 없습니다. 기본값은 함수에 track_history_column_list 또는 track_history_except_column_list 인수가 전달되지 않은 경우 대상 테이블의 모든 열을 포함하는 것입니다. |
source
인수 구현
create_auto_cdc_from_snapshot_flow()
함수에는 source
인수가 포함됩니다. 기록 스냅샷을 처리하기 위해 source
인수는 처리할 스냅샷 데이터와 스냅샷 버전을 포함하는 Python DataFrame이라는 두 값을 create_auto_cdc_from_snapshot_flow()
함수에 반환하는 Python 람다 함수여야 합니다.
다음은 람다 함수의 서명입니다.
lambda Any => Optional[(DataFrame, Any)]
- 람다 함수의 인수는 가장 최근에 처리된 스냅샷 버전입니다.
- 람다 함수의 반환 값은
None
또는 두 값의 튜플입니다. 튜플의 첫 번째 값은 처리할 스냅샷을 포함하는 DataFrame입니다. 튜플의 두 번째 값은 스냅샷의 논리적 순서를 나타내는 스냅샷 버전입니다.
람다 함수를 구현하고 호출하는 예제:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)
Lakeflow 선언적 파이프라인 런타임은 create_auto_cdc_from_snapshot_flow()
함수를 포함한 파이프라인이 트리거될 때마다 다음 단계를 수행합니다:
-
next_snapshot_and_version
함수를 실행하여 다음 스냅샷 DataFrame 및 해당 스냅샷 버전을 로드합니다. - DataFrame이 반환되지 않으면 실행이 종료되고 파이프라인 업데이트가 완료된 것으로 표시됩니다.
- 새 스냅샷의 변경 내용을 검색하고 점진적으로 대상 테이블에 적용합니다.
- 1단계로 돌아와서 다음 스냅샷 및 해당 버전을 로드합니다.