데이터 오케스트레이션 및 자동 로더에 대해 Lakeflow 선언적 파이프라인을 사용하여 CDC(변경 데이터 캡처)를 사용하여 ETL(추출, 변환 및 로드) 파이프라인을 만들고 배포하는 방법을 알아봅니다. ETL 파이프라인은 원본 시스템에서 데이터를 읽고, 데이터 품질 검사 및 중복 제거 기록과 같은 요구 사항에 따라 해당 데이터를 변환하고, 데이터 웨어하우스 또는 데이터 레이크와 같은 대상 시스템에 데이터를 쓰는 단계를 구현합니다.
이 자습서에서는 MySQL 데이터베이스의 customers
테이블 데이터를 사용하여 다음을 수행합니다.
- Debezium 또는 다른 도구를 사용하여 트랜잭션 데이터베이스에서 변경 내용을 추출하고 클라우드 개체 스토리지(S3 폴더, ADLS, GCS)에 저장합니다. 자습서를 간소화하기 위해 외부 CDC 시스템 설정을 건너뜁니다.
- 자동 로더를 사용하여 클라우드 개체 스토리지에서 메시지를 증분 방식으로 로드하고 원시 메시지를
customers_cdc
테이블에 저장합니다. 자동 로더는 스키마를 유추하고 스키마 진화를 처리합니다. - 보기를
customers_cdc_clean
추가하여 예상을 사용하여 데이터 품질을 확인합니다. 예를 들어id
은 항상null
이어야 하지 않습니다, 왜냐하면 이것을 upsert 작업을 실행하는 데 사용할 것이기 때문입니다. - 정리된
AUTO CDC ... INTO
CDC 데이터에서 upsert를 수행하여 최종customers
테이블에 변경 내용을 적용합니다. - Lakeflow 선언적 파이프라인이 모든 변경 내용을 추적하기 위해 SCD2(느린 변경 차원) 유형 2를 만드는 방법을 보여 줍니다.
목표는 거의 실시간으로 원시 데이터를 수집하고 데이터 품질을 보장하면서 분석가 팀을 위한 테이블을 작성하는 것입니다.
이 자습서에서는 medallion Lakehouse 아키텍처를 사용하여 브론즈 계층을 통해 원시 데이터를 수집하고, 은색 계층으로 데이터를 정리하고 유효성을 검사하며, 골드 레이어를 사용하여 차원 모델링 및 집계를 적용합니다. 자세한 내용은 메달리온 레이크하우스 아키텍처가 무엇인지에 대해 참조하세요.
구현할 흐름은 다음과 같습니다.
Lakeflow 선언적 파이프라인, 자동 로더 및 CDC에 대한 자세한 내용은 Lakeflow 선언적 파이프라인, 자동 로더란? 및 CDC(변경 데이터 캡처)란?
Requirements
이 자습서를 완료하려면 다음 요구 사항을 충족해야 합니다.
- Azure Databricks 작업 영역에 로그인합니다.
- 작업 영역에 Unity 카탈로그 를 사용하도록 설정합니다.
- 계정에 대해 서버리스 컴퓨팅 을 사용하도록 설정해야 합니다. 서버리스 Lakeflow 선언적 파이프라인은 모든 작업 영역 지역에서 사용할 수 없습니다. 사용 가능한 지역에 대한 지역 가용성이 제한된 기능을 참조하세요.
- 컴퓨팅 리소스를 만들거나 컴퓨팅 리소스에액세스할 수 있는 권한이 있습니다.
-
카탈로그에 새 스키마를 만들 수 있는 권한이 있습니다. 필요한 사용 권한은
ALL PRIVILEGES
또는USE CATALOG
및CREATE SCHEMA
입니다. -
기존 스키마에 새 볼륨을 만들 수 있는 권한이 있습니다. 필요한 사용 권한은
ALL PRIVILEGES
또는USE SCHEMA
및CREATE VOLUME
입니다.
ETL 파이프라인에서 데이터 캡처 변경
CDC(변경 데이터 캡처)는 트랜잭션 데이터베이스(예: MySQL 또는 PostgreSQL) 또는 데이터 웨어하우스에 대한 레코드의 변경 내용을 캡처하는 프로세스입니다. CDC는 일반적으로 외부 시스템에서 테이블을 다시 구체화하기 위한 스트림으로 데이터 삭제, 추가 및 업데이트와 같은 작업을 캡처합니다. CDC를 사용하면 대량 로드 업데이트가 필요하지 않고도 증분 로드가 가능합니다.
Note
자습서를 간소화하려면 외부 CDC 시스템 설정을 건너뜁니다. BLOB 스토리지(S3, ADLS, GCS)에서 CDC 데이터를 실행 및 JSON 파일로 저장하는 것을 고려할 수 있습니다.
Capturing CDC
다양한 CDC 도구를 사용할 수 있습니다. 오픈 소스 리더 솔루션 중 하나는 Debezium이지만, 데이터 원본을 간소화하는 다른 구현(예: Fivetran, Qlik Replicate, Streamset, Talend, Oracle GoldenGate 및 AWS DMS)이 있습니다.
이 자습서에서는 Debezium 또는 DMS와 같은 외부 시스템의 CDC 데이터를 사용합니다. Debezium은 변경된 모든 행을 캡처합니다. 일반적으로 Kafka 로그에 데이터 변경 기록을 보내거나 파일로 저장합니다.
테이블(JSON 형식)에서 customers
CDC 정보를 수집하고 올바른지 확인한 다음 Lakehouse에서 고객 테이블을 구체화해야 합니다.
Debezium의 CDC 입력
변경될 때마다 업데이트되는 행의 모든 필드(id
, , firstname
, lastname
email
address
)가 포함된 JSON 메시지가 표시됩니다. 또한 다음을 비롯한 추가 메타데이터 정보가 제공됩니다.
-
operation
: 작업 코드(일반적으로 (DELETE
,APPEND
,UPDATE
)) -
operation_date
: 각 작업 작업에 대한 레코드의 날짜 및 타임스탬프입니다.
Debezium과 같은 도구는 변경 전의 행 값과 같은 고급 출력을 생성할 수 있지만 이 자습서에서는 간단히 생략합니다.
0단계: 자습서 데이터 설정
먼저 새 Notebook을 만들고 이 자습서에 사용된 데모 파일을 작업 영역에 설치해야 합니다.
왼쪽 위 모서리에서 새로 만들기를 클릭합니다.
Click Notebook.
<에서 Pipelines 자습서 설정>으로 제목을 변경합니다.
전자 필기장 제목의 맨 위에 있는 옆에서 전자 필기장의 기본 언어를 Python으로 설정합니다.
을 클릭하여 Notebook 환경 패널을 엽니다. (환경)는 Notebook 편집기의 오른쪽 사이드바에 있습니다.
이
faker
노트북은 예제를 위한 가짜 데이터를 생성하는 데 사용됩니다. Notebook의 종속성으로 추가faker
한 다음 환경 패널 아래쪽에서 적용 을 클릭합니다.자습서에서 사용되는 데이터 세트를 생성하려면 첫 번째 셀에 다음 코드를 입력하고 Shift + Enter 를 입력하여 코드를 실행합니다.
# You can change the catalog, schema, dbName, and db. If you do so, you must also # change the names in the rest of the tutorial. catalog = "main" schema = dbName = db = "dbdemos_dlt_cdc" volume_name = "raw_data" spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`') spark.sql(f'USE CATALOG `{catalog}`') spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`') spark.sql(f'USE SCHEMA `{schema}`') spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`') volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}" try: dbutils.fs.ls(volume_folder+"/customers") except: print(f"folder doesn't exists, generating the data under {volume_folder}...") from pyspark.sql import functions as F from faker import Faker from collections import OrderedDict import uuid fake = Faker() import random fake_firstname = F.udf(fake.first_name) fake_lastname = F.udf(fake.last_name) fake_email = F.udf(fake.ascii_company_email) fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S")) fake_address = F.udf(fake.address) operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)]) fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0]) fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None) df = spark.range(0, 100000).repartition(100) df = df.withColumn("id", fake_id()) df = df.withColumn("firstname", fake_firstname()) df = df.withColumn("lastname", fake_lastname()) df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) df = df.withColumn("operation", fake_operation()) df_customers = df.withColumn("operation_date", fake_date()) df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
이 자습서에 사용된 데이터를 미리 보려면 다음 셀에 이 코드를 입력하고 Shift + Enter 를 입력하여 코드를 실행합니다. 이전 셀의 경로를 변경한 경우 여기에 해당하는 변경 내용을 적용해야 합니다.
display(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))
1단계: 파이프라인 만들기
먼저 Lakeflow 선언적 파이프라인에서 ETL 파이프라인을 만듭니다. Lakeflow 선언적 파이프라인은 Lakeflow 선언적 파이프라인 구문을 사용하여 Notebook 또는 파일( 소스 코드라고 함)에 정의된 종속성을 확인하여 파이프라인을 만듭니다. 각 소스 코드 파일에는 하나의 언어만 포함될 수 있지만 파이프라인에 여러 언어별 Notebook 또는 파일을 추가할 수 있습니다. 자세한 내용은 Lakeflow 선언적 파이프라인을 참조하세요.
Important
소스 코드 필드를 비워 두면 소스 코드 작성을 위해 Notebook을 자동으로 만들고 구성합니다.
이 자습서에서는 서버리스 컴퓨팅 및 Unity 카탈로그를 사용합니다. 지정되지 않은 모든 구성 옵션에 대해 기본 설정을 사용합니다. 작업 영역에서 서버리스 컴퓨팅을 사용하도록 설정하거나 지원하지 않는 경우 기본 컴퓨팅 설정을 사용하여 작성된 자습서를 완료할 수 있습니다. 기본 컴퓨팅 설정을 사용하는 경우 파이프라인 UI 만들기의 대상 섹션에 있는 스토리지 옵션 아래에서 Unity 카탈로그를 수동으로 선택해야 합니다.
Lakeflow 선언적 파이프라인에서 새 ETL 파이프라인을 만들려면 다음 단계를 수행합니다.
- 작업 영역에서
사이드바의 작업 및 파이프라인입니다.
- 새로 만들기에서 ETL 파이프라인을 클릭합니다.
- 파이프라인 이름에서 고유한 파이프라인 이름을 입력합니다.
- 서버리스 확인란을 선택합니다.
- 파이프라인 모드에서 Triggered를 선택합니다. 그러면 모든 기존 데이터를 처리한 다음 스트림을 종료하는 AvailableNow 트리거를 사용하여 스트리밍 흐름을 실행합니다.
- 대상에서 테이블이 게시되는 Unity 카탈로그 위치를 구성하려면 기존 카탈로그를 선택하고 스키마에 새 이름을 작성하여 카탈로그에 새 스키마를 만듭니다.
- Click Create.
새 파이프라인에 대한 파이프라인 UI가 나타납니다.
빈 소스 코드 노트북이 파이프라인 용도로 자동 생성되고 구성됩니다. 노트는 사용자 디렉터리 안에 새로운 디렉터리가 생성되어 만들어집니다. 새 디렉터리 및 파일의 이름이 파이프라인의 이름과 일치합니다. 예를 들어 /Users/someone@example.com/my_pipeline/my_pipeline
.
- 이 Notebook에 액세스하기 위한 링크는 파이프라인 세부 정보 패널의 소스 코드 필드 아래에 있습니다. 다음 단계로 진행하기 전에 링크를 클릭하여 전자 필기장을 엽니다.
- 오른쪽 위에서 연결 클릭하여 컴퓨팅 구성 메뉴를 엽니다.
- 1단계에서 만든 파이프라인의 이름을 마우스로 가리킵니다.
- Click Connect.
- 맨 위에 있는 Notebook의 제목 옆에 있는 Notebook의 기본 언어(Python 또는 SQL)를 선택합니다.
Important
Notebook은 단일 프로그래밍 언어만 포함할 수 있습니다. 파이프라인 소스 코드 Notebook에서 Python 및 SQL 코드를 혼합하지 마세요.
Lakeflow 선언적 파이프라인을 개발할 때 Python 또는 SQL을 선택할 수 있습니다. 이 자습서에는 두 언어에 대한 예제가 포함되어 있습니다. 언어 선택에 따라 기본 전자 필기장 언어를 선택해야 합니다.
Lakeflow 선언적 파이프라인 코드 개발을 위한 Notebook 지원에 대한 자세한 내용은 Lakeflow 선언적 파이프라인에서 Notebook을 사용하여 ETL 파이프라인 개발 및 디버그를 참조하세요.
2단계: 자동 로더를 사용하여 데이터 증분 수집
첫 번째 단계는 클라우드 스토리지의 원시 데이터를 브론즈 계층으로 수집하는 것입니다.
이는 여러 가지 이유로 어려울 수 있으며, 해야 할 것은 다음과 같습니다.
- 대규모로 작동하여 수백만 개의 작은 파일을 수집할 수 있습니다.
- 유추 스키마 및 JSON 타입.
- 잘못된 JSON 스키마를 사용하여 잘못된 레코드를 처리합니다.
- 스키마 진화(예: 고객 테이블의 새 열)를 처리합니다.
자동 로더는 스키마 유추 및 스키마 진화를 포함하여 이 수집을 간소화하면서 수백만 개의 들어오는 파일로 확장합니다. 자동 로더는 Python에서 사용 cloudFiles
및 SQL SELECT * FROM STREAM read_files(...)
에서 사용할 수 있으며 다양한 형식(JSON, CSV, Apache Avro 등)과 함께 사용할 수 있습니다.
테이블을 스트리밍 테이블로 정의하면 새 들어오는 데이터만 사용할 수 있습니다. 스트리밍 테이블로 정의하지 않으면 사용 가능한 모든 데이터를 검색하고 수집합니다. 자세한 내용은 스트리밍 테이블을 참조하세요.
자동 로더를 사용하여 들어오는 데이터를 수집하려면 다음 코드를 복사하여 Notebook의 첫 번째 셀에 붙여넣습니다. 이전 단계에서 선택한 Notebook의 기본 언어에 따라 Python 또는 SQL을 사용할 수 있습니다.
Python
from dlt import * from pyspark.sql.functions import * # Create the target bronze table dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone") # Create an Append Flow to ingest the raw data into the bronze table @append_flow( target = "customers_cdc_bronze", name = "customers_bronze_ingest_flow" ) def customers_bronze_ingest_flow(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers") )
SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_bronze COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_bronze_ingest_flow AS INSERT INTO customers_cdc_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/main/dbdemos_dlt_cdc/raw_data/customers", format => "json", inferColumnTypes => "true" )
시작을 클릭하여 연결된 파이프라인에 대한 업데이트를 시작합니다.
3단계: 데이터 품질을 추적하기 위한 정리 및 기대
브론즈 계층이 정의되면 다음 조건을 확인하여 데이터 품질을 제어하는 기대치를 추가하여 실버 계층을 만듭니다.
- ID는 절대로
null
가 될 수 없습니다. - CDC 작업 유형이 유효해야 합니다.
-
json
자동 로더에서 적절하게 읽혔어야 합니다.
이러한 조건 중 하나가 적용되지 않으면 행이 삭제됩니다.
자세한 내용은 파이프라인 기대치를 사용하여 데이터 품질 관리를 참조하세요.
편집을 클릭하고 아래 셀 삽입을 클릭하여 새 빈 셀을 추가합니다.
정리된 테이블이 있는 실버 레이어를 만들고 제약 조건을 적용하려면 다음 코드를 복사하여 Notebook의 새 셀에 붙여넣습니다.
Python
dlt.create_streaming_table( name = "customers_cdc_clean", expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"} ) @append_flow( target = "customers_cdc_clean", name = "customers_cdc_clean_flow" ) def customers_cdc_clean_flow(): return ( dlt.read_stream("customers_cdc_bronze") .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data") )
SQL
CREATE OR REFRESH STREAMING TABLE customers_cdc_clean ( CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW, CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW ) COMMENT "New customer data incrementally ingested from cloud object storage landing zone"; CREATE FLOW customers_cdc_clean_flow AS INSERT INTO customers_cdc_clean BY NAME SELECT * FROM STREAM customers_cdc_bronze;
시작을 클릭하여 연결된 파이프라인에 대한 업데이트를 시작합니다.
4단계: AUTO CDC 흐름을 사용하여 고객 테이블 구체화
테이블에는 customers
가장 up-to-date 뷰가 포함되며 원래 테이블의 복제본이 됩니다.
수동으로 구현하는 것은 쉽지 않습니다. 최신 행을 유지하려면 데이터 중복 제거와 같은 항목을 고려해야 합니다.
Lakeflow 선언적 파이프라인은 AUTO CDC
작업으로 이러한 문제를 해결합니다.
편집을 클릭하고 아래 셀 삽입을 클릭하여 새 빈 셀을 추가합니다.
Lakeflow 선언적 파이프라인에서 사용하여
AUTO CDC
CDC 데이터를 처리하려면 다음 코드를 복사하여 Notebook의 새 셀에 붙여넣습니다.Python
dlt.create_streaming_table(name="customers", comment="Clean, materialized customers") dlt.create_auto_cdc_flow( target="customers", # The customer table being materialized source="customers_cdc_clean", # the incoming CDC keys=["id"], # what we'll be using to match the rows to upsert sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition except_column_list=["operation", "operation_date", "_rescued_data"], )
SQL
CREATE OR REFRESH STREAMING TABLE customers; CREATE FLOW customers_cdc_flow AS AUTO CDC INTO customers FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 1;
시작을 클릭하여 연결된 파이프라인에 대한 업데이트를 시작합니다.
5단계: 유형 2의 느린 변경 차원(SCD2)
다음과 같은 결과로 발생하는 모든 변경 내용을 추적하는 테이블을 만들어야 하는 APPEND
UPDATE
DELETE
경우가 많습니다.
- 기록: 테이블에 대한 모든 변경 내용의 기록을 유지하려고 합니다.
- 추적 가능성: 어떤 작업이 발생했는지 확인하려고 합니다.
Lakeflow 선언적 파이프라인을 사용하는 SCD2
Delta는 CDF(변경 데이터 흐름)를 지원하며 SQL 및 table_change
Python에서 테이블 수정을 쿼리할 수 있습니다. 그러나 CDF의 주요 사용 사례는 파이프라인에서 변경 내용을 캡처하고 처음부터 테이블 변경 내용의 전체 보기를 만들지 않는 것입니다.
순서가 다른 이벤트가 있는 경우 구현하는 것이 특히 복잡해집니다. 변경 내용을 타임스탬프별로 순서를 지정하고 과거에 발생한 수정 내용을 수신해야 하는 경우 SCD 테이블에 새 항목을 추가하고 이전 항목을 업데이트해야 합니다.
Lakeflow 선언적 파이프라인은 이러한 복잡성을 제거하고 처음부터 모든 수정 사항을 포함하는 별도의 테이블을 만들 수 있습니다. 이 테이블은 필요한 경우 특정 파티션/zorder 열과 함께 대규모로 사용할 수 있습니다. 순서가 다른 필드는 _sequence_by를 기반으로 별도의 설정 없이 자동으로 처리됩니다.
SCD2 테이블을 만들려면 SQL에서 STORED AS SCD TYPE 2
옵션을 사용하거나 Python에서 stored_as_scd_type="2"
옵션을 사용해야 합니다.
Note
이 옵션을 사용하여 기능이 추적하는 열을 제한할 수도 있습니다. TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
편집을 클릭하고 아래 셀 삽입을 클릭하여 새 빈 셀을 추가합니다.
다음 코드를 복사하여 Notebook의 새 셀에 붙여넣습니다.
Python
# create the table dlt.create_streaming_table( name="customers_history", comment="Slowly Changing Dimension Type 2 for customers" ) # store all changes as SCD2 dlt.create_auto_cdc_flow( target="customers_history", source="customers_cdc_clean", keys=["id"], sequence_by=col("operation_date"), ignore_null_updates=False, apply_as_deletes=expr("operation = 'DELETE'"), except_column_list=["operation", "operation_date", "_rescued_data"], stored_as_scd_type="2", ) # Enable SCD2 and store individual updates
SQL
CREATE OR REFRESH STREAMING TABLE customers_history; CREATE FLOW cusotmers_history_cdc AS AUTO CDC INTO customers_history FROM stream(customers_cdc_clean) KEYS (id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_date COLUMNS * EXCEPT (operation, operation_date, _rescued_data) STORED AS SCD TYPE 2;
시작을 클릭하여 연결된 파이프라인에 대한 업데이트를 시작합니다.
6단계: 정보를 가장 많이 변경한 사용자를 추적하는 구체화된 뷰 만들기
테이블에 customers_history
는 사용자가 해당 정보에 대해 변경한 모든 기록 변경 내용이 포함되어 있습니다. 이제 골드 레이어에서 정보를 가장 많이 변경한 사용자를 추적하는 간단한 구체화된 뷰를 만듭니다. 실제 시나리오에서 사기 탐지 분석 또는 사용자 권장 사항에 사용할 수 있습니다. 또한 SCD2를 사용하여 변경 내용을 적용하면 이미 중복 항목이 제거되었으므로 사용자 ID당 행 수를 직접 계산할 수 있습니다.
편집을 클릭하고 아래 셀 삽입을 클릭하여 새 빈 셀을 추가합니다.
다음 코드를 복사하여 Notebook의 새 셀에 붙여넣습니다.
Python
@dlt.table( name = "customers_history_agg", comment = "Aggregated customer history" ) def customers_history_agg(): return ( dlt.read("customers_history") .groupBy("id") .agg( count("address").alias("address_count"), count("email").alias("email_count"), count("firstname").alias("firstname_count"), count("lastname").alias("lastname_count") ) )
SQL
CREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS SELECT id, count("address") as address_count, count("email") AS email_count, count("firstname") AS firstname_count, count("lastname") AS lastname_count FROM customers_history GROUP BY id
시작을 클릭하여 연결된 파이프라인에 대한 업데이트를 시작합니다.
7단계: ETL 파이프라인을 실행하는 작업 만들기
다음으로 Databricks 작업을 사용하여 데이터 수집, 처리 및 분석 단계를 자동화하는 워크플로를 만듭니다.
- 작업 영역에서
사이드바의 작업 및 파이프라인입니다.
- 새로 만들기에서 작업을 클릭합니다.
- 작업 제목 상자에서 새 작업 <날짜 및 시간을> 작업 이름으로 바꿉 있습니다. 예를 들어
CDC customers workflow
. -
작업 이름에서 첫 번째 작업 이름(예:
ETL_customers_data
)을 입력합니다. - 형식에서 파이프라인을 선택합니다.
- 파이프라인에서 1단계에서 만든 파이프라인을 선택합니다.
- Click Create.
- 워크플로를 실행하려면 지금 실행을 클릭합니다. 실행에 대한 세부 정보를 보려면 실행 탭을 클릭합니다. 작업을 클릭하여 작업 실행에 대한 세부 정보를 봅니다.
- 워크플로가 완료될 때 결과를 보려면 성공한 최신 실행으로 이동 또는 작업 실행 시작 시간을 클릭합니다. 출력 페이지가 나타나고 쿼리 결과가 표시됩니다.
작업 실행에 대한 자세한 내용은 Lakeflow 작업의 모니터링 및 관찰 가능성을 참조하세요.
8단계: 작업 예약
일정에 따라 ETL 파이프라인을 실행하려면 다음 단계를 수행합니다.
-
사이드바의 작업 및 파이프라인입니다.
- 필요에 따라 작업 필터와 내 소유 필터를 선택합니다.
- 이름 열에서 작업 이름을 클릭합니다. 측면 패널이 작업 세부 정보로 표시됩니다.
- 일정 및 트리거 패널에서 트리거 추가를 클릭하고 트리거 유형에서 예약을 선택합니다.
- 기간, 시작 시간 및 표준 시간대를 지정합니다.
- Click Save.