Lakeflow 선언적 파이프라인에서
중요합니다
이 기능은 공개 미리 보기로 제공됩니다.
이 문서에서는 Lakeflow 선언적 파이프라인의 SQL 함수를 사용하여 JSON Blob의 스키마를 from_json
유추하고 발전시키는 방법을 설명합니다.
개요
SQL 함수는 from_json
JSON 문자열 열을 구문 분석하고 구조체 값을 반환합니다. Lakeflow 선언적 파이프라인 외부에서 사용할 때는 schema
인수를 통해 반환값의 스키마를 명확하게 지정해야 합니다. Lakeflow 선언적 파이프라인과 함께 사용할 경우 스키마 유추 및 진화를 사용하도록 설정하여 반환된 값의 스키마를 자동으로 관리할 수 있습니다. 이 기능은 초기 설정(특히 스키마를 알 수 없는 경우)과 스키마가 자주 변경되는 경우 진행 중인 작업을 모두 간소화합니다.
자동 로더, Kafka 또는 Kinesis와 같은 스트리밍 데이터 원본에서 임의의 JSON Blob을 원활하게 처리할 수 있습니다.
특히 Lakeflow 선언적 파이프라인에서 SQL 함수에 사용되는 경우, 스키마 유추 및 진화는 from_json
다음과 같은 작업을 수행할 수 있습니다.
- 들어오는 JSON 레코드에서 새 필드 검색(중첩된 JSON 개체 포함)
- 필드 형식을 유추하고 적절한 Spark 데이터 형식에 매핑
- 새 필드를 수용하도록 스키마를 자동으로 진화
- 현재 스키마를 준수하지 않는 데이터를 자동으로 처리합니다.
구문: 스키마를 자동으로 유추 및 진화
Lakeflow 선언적 파이프라인과 함께 사용하는 from_json
경우 스키마를 자동으로 유추하고 발전할 수 있습니다. 이를 사용하도록 설정하려면 스키마를 NULL로 설정하고 옵션을 지정합니다 schemaLocationKey
. 이렇게 하면 스키마를 유추하고 추적할 수 있습니다.
SQL (영문)
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
파이썬
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
쿼리에는 여러 from_json
식이 있을 수 있지만 각 식에는 고유한 schemaLocationKey
식이 있어야 합니다.
schemaLocationKey
파이프라인당 고유해야 합니다.
SQL (영문)
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
파이썬
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
구문: 고정 스키마
대신 특정 스키마를 적용하려는 경우 다음 from_json
구문을 사용하여 해당 스키마를 사용하여 JSON 문자열을 구문 분석할 수 있습니다.
from_json(jsonStr, schema, [, options])
이 구문은 Lakeflow 선언적 파이프라인을 비롯한 모든 Azure Databricks 환경에서 사용할 수 있습니다. 자세한 내용은 여기를 참조하세요.
스키마 유추
from_json
JSON 데이터 열의 첫 번째 일괄 처리에서 스키마를 유추하고, 내부적으로 schemaLocationKey
에 따라 인덱싱합니다 (필수).
JSON 문자열이 단일 개체(예: {"id": 123, "name": "John"}
) from_json
인 경우 STRUCT 형식의 스키마를 유추하고 필드 목록에 추가합니다 rescuedDataColumn
.
STRUCT<id LONG, name STRING, _rescued_data STRING>
그러나 JSON 문자열에 최상위 배열(예: ["id": 123, "name": "John"]
) from_json
이 있는 경우 STRUCT에서 배열을 래핑합니다. 이 방법을 사용하면 유추된 스키마와 호환되지 않는 데이터를 다시 가져올 수 있습니다. 배열 값을 별도의 행 다운스트림으로 분해 하는 옵션이 있습니다.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
스키마 힌트를 사용하여 스키마 유추 재정의
선택적으로 schemaHints
를 제공하여 from_json
가 열 형식을 유추하는 방식에 영향을 줄 수 있습니다. 열이 특정 데이터 형식임을 알고 있거나 보다 일반적인 데이터 형식(예: 정수 대신 double)을 선택하려는 경우에 유용합니다. SQL 스키마 사양 구문을 사용하여 열 데이터 형식에 대한 임의의 힌트 수를 제공할 수 있습니다. 스키마 힌트의 의미 체계는 자동 로더 스키마 힌트와 동일합니다. 다음은 그 예입니다.
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
JSON 문자열에 최상위 배열이 포함되어 있으면 STRUCT로 래핑됩니다. 이러한 경우 스키마 힌트는 래핑된 STRUCT 대신 배열 스키마에 적용됩니다. 예를 들어 다음과 같은 최상위 배열이 있는 JSON 문자열을 고려합니다.
[{"id": 123, "name": "John"}]
유추된 배열 스키마는 STRUCT에 래핑됩니다.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
데이터 id
형식을 변경하려면 스키마 힌트를 STRING으로 element.id
지정합니다. DOUBLE 형식의 새 열을 추가하려면 DOUBLE을 지정합니다 element.new_col
. 이러한 힌트로 인해 최상위 JSON 배열의 스키마는 다음과 같습니다.
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
스키마를 schemaEvolutionMode
를 사용하여 진화시킵니다.
from_json
는 데이터를 처리할 때 새 열이 추가되는 것을 감지합니다. 새 필드를 검색하면 from_json
새 열을 스키마의 끝에 병합하여 유추된 스키마를 최신 스키마로 업데이트합니다. 기존 열의 데이터 형식은 변경되지 않은 상태로 유지됩니다. 스키마 업데이트 후 업데이트된 스키마를 사용하여 파이프라인이 자동으로 다시 시작됩니다.
from_json
는 선택적 schemaEvolutionMode
설정을 사용하여 설정한 스키마 진화를 위해 다음 모드를 지원합니다. 이러한 모드는 자동 로더와 일치합니다.
schemaEvolutionMode |
새 열을 읽는 동작 |
---|---|
addNewColumns (기본값) |
스트림이 실패합니다. 새 열이 스키마에 추가됩니다. 기존 열의 데이터 형식은 진화되지 않습니다. |
rescue |
스키마는 절대 진화되지 않으며 스키마 변경으로 인해 스트림이 실패하지 않습니다. 모든 새 열이 복구된 데이터 열에 기록됩니다. |
failOnNewColumns |
스트림이 실패합니다. 업데이트되거나 잘못된 데이터가 제거되지 않는 한 schemaHints 스트림이 다시 시작되지 않습니다. |
none |
rescuedDataColumn 옵션을 설정하지 않으면 스키마가 진화하지 않고 새 열이 무시되며 데이터가 복구되지 않습니다. 스키마 변경으로 인해 스트림이 실패하지 않습니다. |
다음은 그 예입니다.
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
복구된 데이터 열
구조된 데이터 열은 스키마에 _rescued_data
로 자동으로 추가됩니다. 옵션을 설정 rescuedDataColumn
하여 열 이름을 바꿀 수 있습니다. 다음은 그 예입니다.
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
구조된 데이터 열을 사용하도록 선택하면 유추된 스키마와 일치하지 않는 모든 열이 삭제되는 대신 구조됩니다. 데이터 형식 불일치, 스키마의 누락된 열 또는 열 이름 대/소문자 차이로 인해 발생할 수 있습니다.
손상된 레코드 처리
형식이 잘못되어 구문 분석할 수 없는 레코드를 저장하려면 다음 예제와 같이 스키마 힌트를 설정하여 열을 추가 _corrupt_record
합니다.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
손상된 레코드 열의 이름을 바꾸려면 옵션을 설정합니다 columnNameOfCorruptRecord
.
JSON 파서는 손상된 레코드를 처리하기 위한 세 가지 모드를 지원합니다.
모드 | 설명 |
---|---|
PERMISSIVE |
손상된 레코드의 경우 잘못된 형식의 문자열을 구성한 columnNameOfCorruptRecord 필드에 넣고 형식이 잘못된 필드를 null .로 설정합니다. 손상된 레코드를 유지하려면 사용자 정의 스키마에서 columnNameOfCorruptRecord 문자열 형식 필드를 설정할 수 있습니다. 스키마에 필드가 없으면 구문 분석 중에 손상된 레코드가 삭제됩니다. 스키마를 유추할 때 파서는 출력 스키마에 필드를 암시적으로 추가합니다 columnNameOfCorruptRecord . |
DROPMALFORMED |
손상된 레코드를 무시합니다.DROPMALFORMED 모드를 rescuedDataColumn 와 함께 사용할 때, 데이터 형식 불일치로 인해 레코드가 삭제되지 않습니다. 불완전하거나 형식이 잘못된 JSON과 같이 손상된 레코드만 삭제됩니다. |
FAILFAST |
파서가 손상된 레코드를 발견할 때 예외를 발생시킵니다.FAILFAST 모드를 rescuedDataColumn 와 함께 사용하면 데이터 유형 불일치 때문에 오류가 발생하지 않습니다. 손상된 레코드만 오류를 발생시킵니다. 예를 들어, JSON이 불완전하거나 형식이 잘못된 경우가 있습니다. |
from_json 출력의 필드를 참조하세요.
from_json
는 파이프라인 실행 중에 스키마를 유추합니다.
from_json
함수가 적어도 한 번 이상 성공적으로 실행되기 전에 다운스트림 쿼리가 from_json
필드를 참조하면, 필드가 해결되지 않고 쿼리가 건너뜁니다. 다음 예제에서는 브론즈 쿼리의 from_json
함수가 실행되어 스키마를 유추할 때까지 실버 테이블 쿼리에 대한 분석을 건너뜁니다.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
from_json
함수와 함수가 유추하는 필드를 동일한 쿼리에서 참조하는 경우 다음 예제와 같이 분석이 실패할 수 있습니다.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
필드에 대한 참조를 from_json
다운스트림 쿼리(예: 위의 bronze/silver 예제)로 이동하여 이 문제를 해결할 수 있습니다. 또는 참조 schemaHints
된 필드가 포함된 필드를 지정할 from_json
수 있습니다. 다음은 그 예입니다.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
예: 스키마를 자동으로 유추 및 진화
이 섹션에서는 Lakeflow 선언적 파이프라인에서 자동 스키마 유추 및 진화를 사용하도록 from_json
설정하는 예제 코드를 제공합니다.
클라우드 개체 스토리지에서 스트리밍 테이블 만들기
다음 예제에서는 구문을 사용하여 read_files
클라우드 개체 스토리지에서 스트리밍 테이블을 만듭니다.
SQL (영문)
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
파이썬
@dlt.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Kafka에서 스트리밍 테이블 만들기
다음 예제에서는 구문을 사용하여 read_kafka
Kafka에서 스트리밍 테이블을 만듭니다.
SQL (영문)
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
파이썬
@dlt.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
예: 고정 스키마
고정 스키마와 함께 사용하는 from_json
코드 예제는 함수를 참조하세요from_json
.
자주 묻는 질문 (FAQ)
이 섹션에서는 함수의 스키마 유추 및 진화 지원에 대한 질문과 대답을 from_json
제공합니다.
from_json
와 parse_json
의 차이점은 무엇인가요?
parse_json
함수는 JSON 문자열에서 VARIANT
값을 반환합니다.
VARIANT는 반구조화된 데이터를 저장하는 유연하고 효율적인 방법을 제공합니다. 이렇게 하면 엄격한 형식을 완전히 제거하여 스키마 유추 및 진화를 피할 수 있습니다. 그러나 쓰기 시간에 스키마를 적용하려는 경우(예: 비교적 엄격한 스키마가 있기 때문에) from_json
더 나은 옵션이 될 수 있습니다.
다음 표에서는 차이점과 다음의 from_json
parse_json
차이점을 설명합니다.
기능 | 사용 사례 | 가용도 |
---|---|---|
from_json |
스키마 진화를 통해 from_json 스키마를 유지합니다. 이 기능은 다음과 같은 경우에 유용합니다.
|
Lakeflow 선언적 파이프라인에서만 스키마 유추 및 진화와 함께 사용 가능 |
parse_json |
VARIANT는 스키마화할 필요가 없는 데이터를 보유하는 데 특히 적합합니다. 다음은 그 예입니다.
|
Lakeflow 선언적 파이프라인이 포함된 버전과 포함되지 않은 버전으로 제공됩니다. |
Lakeflow 선언적 파이프라인 외부에서 스키마 유추 및 진화 구문을 사용할 from_json
수 있나요?
아니요, Lakeflow 선언적 파이프라인 외부에서는 스키마 유추 및 진화 구문을 사용할 from_json
수 없습니다.
유추 from_json
된 스키마에 액세스하려면 어떻게 해야 하나요?
대상 스트리밍 테이블의 스키마를 봅니다.
스키마를 전달 from_json
하고 진화를 수행할 수 있나요?
아니요, 스키마를 전달할 from_json
수 없으며 진화도 수행할 수 없습니다. 그러나 스키마 힌트를 제공하여 유추된 from_json
필드의 일부 또는 전부를 재정의할 수 있습니다.
테이블이 완전히 새로 고쳐지면 스키마는 어떻게 되나요?
테이블과 연결된 스키마 위치가 지워지고 스키마가 처음부터 다시 유추됩니다.