다음을 통해 공유


Lakeflow 선언적 파이프라인에서 from_json을 사용하여 스키마를 유추하고 진화시키기

중요합니다

이 기능은 공개 미리 보기로 제공됩니다.

이 문서에서는 Lakeflow 선언적 파이프라인의 SQL 함수를 사용하여 JSON Blob의 스키마를 from_json 유추하고 발전시키는 방법을 설명합니다.

개요

SQL 함수는 from_json JSON 문자열 열을 구문 분석하고 구조체 값을 반환합니다. Lakeflow 선언적 파이프라인 외부에서 사용할 때는 schema 인수를 통해 반환값의 스키마를 명확하게 지정해야 합니다. Lakeflow 선언적 파이프라인과 함께 사용할 경우 스키마 유추 및 진화를 사용하도록 설정하여 반환된 값의 스키마를 자동으로 관리할 수 있습니다. 이 기능은 초기 설정(특히 스키마를 알 수 없는 경우)과 스키마가 자주 변경되는 경우 진행 중인 작업을 모두 간소화합니다. 자동 로더, Kafka 또는 Kinesis와 같은 스트리밍 데이터 원본에서 임의의 JSON Blob을 원활하게 처리할 수 있습니다.

특히 Lakeflow 선언적 파이프라인에서 SQL 함수에 사용되는 경우, 스키마 유추 및 진화는 from_json 다음과 같은 작업을 수행할 수 있습니다.

  • 들어오는 JSON 레코드에서 새 필드 검색(중첩된 JSON 개체 포함)
  • 필드 형식을 유추하고 적절한 Spark 데이터 형식에 매핑
  • 새 필드를 수용하도록 스키마를 자동으로 진화
  • 현재 스키마를 준수하지 않는 데이터를 자동으로 처리합니다.

구문: 스키마를 자동으로 유추 및 진화

Lakeflow 선언적 파이프라인과 함께 사용하는 from_json 경우 스키마를 자동으로 유추하고 발전할 수 있습니다. 이를 사용하도록 설정하려면 스키마를 NULL로 설정하고 옵션을 지정합니다 schemaLocationKey . 이렇게 하면 스키마를 유추하고 추적할 수 있습니다.

SQL (영문)

from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))

파이썬

from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})

쿼리에는 여러 from_json 식이 있을 수 있지만 각 식에는 고유한 schemaLocationKey식이 있어야 합니다. schemaLocationKey 파이프라인당 고유해야 합니다.

SQL (영문)

SELECT
  value,
  from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
  from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

파이썬

(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "text")
    .load("/databricks-datasets/nyctaxi/sample/json/")
    .select(
      col("value"),
      from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
      from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)

구문: 고정 스키마

대신 특정 스키마를 적용하려는 경우 다음 from_json 구문을 사용하여 해당 스키마를 사용하여 JSON 문자열을 구문 분석할 수 있습니다.

from_json(jsonStr, schema, [, options])

이 구문은 Lakeflow 선언적 파이프라인을 비롯한 모든 Azure Databricks 환경에서 사용할 수 있습니다. 자세한 내용은 여기를 참조하세요.

스키마 유추

from_json JSON 데이터 열의 첫 번째 일괄 처리에서 스키마를 유추하고, 내부적으로 schemaLocationKey에 따라 인덱싱합니다 (필수).

JSON 문자열이 단일 개체(예: {"id": 123, "name": "John"}) from_json 인 경우 STRUCT 형식의 스키마를 유추하고 필드 목록에 추가합니다 rescuedDataColumn .

STRUCT<id LONG, name STRING, _rescued_data STRING>

그러나 JSON 문자열에 최상위 배열(예: ["id": 123, "name": "John"]) from_json 이 있는 경우 STRUCT에서 배열을 래핑합니다. 이 방법을 사용하면 유추된 스키마와 호환되지 않는 데이터를 다시 가져올 수 있습니다. 배열 값을 별도의 행 다운스트림으로 분해 하는 옵션이 있습니다.

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

스키마 힌트를 사용하여 스키마 유추 재정의

선택적으로 schemaHints를 제공하여 from_json가 열 형식을 유추하는 방식에 영향을 줄 수 있습니다. 열이 특정 데이터 형식임을 알고 있거나 보다 일반적인 데이터 형식(예: 정수 대신 double)을 선택하려는 경우에 유용합니다. SQL 스키마 사양 구문을 사용하여 열 데이터 형식에 대한 임의의 힌트 수를 제공할 수 있습니다. 스키마 힌트의 의미 체계는 자동 로더 스키마 힌트와 동일합니다. 다음은 그 예입니다.

SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

JSON 문자열에 최상위 배열이 포함되어 있으면 STRUCT로 래핑됩니다. 이러한 경우 스키마 힌트는 래핑된 STRUCT 대신 배열 스키마에 적용됩니다. 예를 들어 다음과 같은 최상위 배열이 있는 JSON 문자열을 고려합니다.

[{"id": 123, "name": "John"}]

유추된 배열 스키마는 STRUCT에 래핑됩니다.

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

데이터 id형식을 변경하려면 스키마 힌트를 STRING으로 element.id 지정합니다. DOUBLE 형식의 새 열을 추가하려면 DOUBLE을 지정합니다 element.new_col . 이러한 힌트로 인해 최상위 JSON 배열의 스키마는 다음과 같습니다.

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

스키마를 schemaEvolutionMode를 사용하여 진화시킵니다.

from_json 는 데이터를 처리할 때 새 열이 추가되는 것을 감지합니다. 새 필드를 검색하면 from_json 새 열을 스키마의 끝에 병합하여 유추된 스키마를 최신 스키마로 업데이트합니다. 기존 열의 데이터 형식은 변경되지 않은 상태로 유지됩니다. 스키마 업데이트 후 업데이트된 스키마를 사용하여 파이프라인이 자동으로 다시 시작됩니다.

from_json 는 선택적 schemaEvolutionMode 설정을 사용하여 설정한 스키마 진화를 위해 다음 모드를 지원합니다. 이러한 모드는 자동 로더와 일치합니다.

schemaEvolutionMode 새 열을 읽는 동작
addNewColumns(기본값) 스트림이 실패합니다. 새 열이 스키마에 추가됩니다. 기존 열의 데이터 형식은 진화되지 않습니다.
rescue 스키마는 절대 진화되지 않으며 스키마 변경으로 인해 스트림이 실패하지 않습니다. 모든 새 열이 복구된 데이터 열에 기록됩니다.
failOnNewColumns 스트림이 실패합니다. 업데이트되거나 잘못된 데이터가 제거되지 않는 한 schemaHints 스트림이 다시 시작되지 않습니다.
none rescuedDataColumn 옵션을 설정하지 않으면 스키마가 진화하지 않고 새 열이 무시되며 데이터가 복구되지 않습니다. 스키마 변경으로 인해 스트림이 실패하지 않습니다.

다음은 그 예입니다.

SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

복구된 데이터 열

구조된 데이터 열은 스키마에 _rescued_data로 자동으로 추가됩니다. 옵션을 설정 rescuedDataColumn 하여 열 이름을 바꿀 수 있습니다. 다음은 그 예입니다.

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

구조된 데이터 열을 사용하도록 선택하면 유추된 스키마와 일치하지 않는 모든 열이 삭제되는 대신 구조됩니다. 데이터 형식 불일치, 스키마의 누락된 열 또는 열 이름 대/소문자 차이로 인해 발생할 수 있습니다.

손상된 레코드 처리

형식이 잘못되어 구문 분석할 수 없는 레코드를 저장하려면 다음 예제와 같이 스키마 힌트를 설정하여 열을 추가 _corrupt_record 합니다.

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL,
      map('schemaLocationKey', 'nycTaxi',
          'schemaHints', '_corrupt_record STRING',
          'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

손상된 레코드 열의 이름을 바꾸려면 옵션을 설정합니다 columnNameOfCorruptRecord .

JSON 파서는 손상된 레코드를 처리하기 위한 세 가지 모드를 지원합니다.

모드 설명
PERMISSIVE 손상된 레코드의 경우 잘못된 형식의 문자열을 구성한 columnNameOfCorruptRecord 필드에 넣고 형식이 잘못된 필드를 null.로 설정합니다. 손상된 레코드를 유지하려면 사용자 정의 스키마에서 columnNameOfCorruptRecord 문자열 형식 필드를 설정할 수 있습니다. 스키마에 필드가 없으면 구문 분석 중에 손상된 레코드가 삭제됩니다. 스키마를 유추할 때 파서는 출력 스키마에 필드를 암시적으로 추가합니다 columnNameOfCorruptRecord .
DROPMALFORMED 손상된 레코드를 무시합니다.
DROPMALFORMED 모드를 rescuedDataColumn와 함께 사용할 때, 데이터 형식 불일치로 인해 레코드가 삭제되지 않습니다. 불완전하거나 형식이 잘못된 JSON과 같이 손상된 레코드만 삭제됩니다.
FAILFAST 파서가 손상된 레코드를 발견할 때 예외를 발생시킵니다.
FAILFAST 모드를 rescuedDataColumn와 함께 사용하면 데이터 유형 불일치 때문에 오류가 발생하지 않습니다. 손상된 레코드만 오류를 발생시킵니다. 예를 들어, JSON이 불완전하거나 형식이 잘못된 경우가 있습니다.

from_json 출력의 필드를 참조하세요.

from_json 는 파이프라인 실행 중에 스키마를 유추합니다. from_json 함수가 적어도 한 번 이상 성공적으로 실행되기 전에 다운스트림 쿼리가 from_json 필드를 참조하면, 필드가 해결되지 않고 쿼리가 건너뜁니다. 다음 예제에서는 브론즈 쿼리의 from_json 함수가 실행되어 스키마를 유추할 때까지 실버 테이블 쿼리에 대한 분석을 건너뜁니다.

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
  SELECT jsonCol.VendorID, jsonCol.total_amount
  FROM bronze

from_json 함수와 함수가 유추하는 필드를 동일한 쿼리에서 참조하는 경우 다음 예제와 같이 분석이 실패할 수 있습니다.

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

필드에 대한 참조를 from_json 다운스트림 쿼리(예: 위의 bronze/silver 예제)로 이동하여 이 문제를 해결할 수 있습니다. 또는 참조 schemaHints 된 필드가 포함된 필드를 지정할 from_json 수 있습니다. 다음은 그 예입니다.

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
  WHERE jsonCol.total_amount > 100.0

예: 스키마를 자동으로 유추 및 진화

이 섹션에서는 Lakeflow 선언적 파이프라인에서 자동 스키마 유추 및 진화를 사용하도록 from_json 설정하는 예제 코드를 제공합니다.

클라우드 개체 스토리지에서 스트리밍 테이블 만들기

다음 예제에서는 구문을 사용하여 read_files 클라우드 개체 스토리지에서 스트리밍 테이블을 만듭니다.

SQL (영문)

CREATE STREAMING TABLE bronze AS
  SELECT
    from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
  FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

파이썬

@dlt.table(comment="from_json autoloader example")
def bronze():
  return (
    spark.readStream
         .format("cloudFiles")
         .option("cloudFiles.format", "text")
         .load("/databricks-datasets/nyctaxi/sample/json/")
         .select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)

Kafka에서 스트리밍 테이블 만들기

다음 예제에서는 구문을 사용하여 read_kafka Kafka에서 스트리밍 테이블을 만듭니다.

SQL (영문)

CREATE STREAMING TABLE bronze AS
  SELECT
    value,
    from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
  FROM READ_KAFKA(
    bootstrapSevers => '<server:ip>',
    subscribe => 'events',
    "startingOffsets", "latest"
)

파이썬

@dlt.table(comment="from_json kafka example")
def bronze():
  return (
    spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "<server:ip>")
         .option("subscribe", "<topic>")
         .option("startingOffsets", "latest")
         .load()
         .select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)

예: 고정 스키마

고정 스키마와 함께 사용하는 from_json 코드 예제는 함수를 참조하세요from_json.

자주 묻는 질문 (FAQ)

이 섹션에서는 함수의 스키마 유추 및 진화 지원에 대한 질문과 대답을 from_json 제공합니다.

from_jsonparse_json의 차이점은 무엇인가요?

parse_json 함수는 JSON 문자열에서 VARIANT 값을 반환합니다.

VARIANT는 반구조화된 데이터를 저장하는 유연하고 효율적인 방법을 제공합니다. 이렇게 하면 엄격한 형식을 완전히 제거하여 스키마 유추 및 진화를 피할 수 있습니다. 그러나 쓰기 시간에 스키마를 적용하려는 경우(예: 비교적 엄격한 스키마가 있기 때문에) from_json 더 나은 옵션이 될 수 있습니다.

다음 표에서는 차이점과 다음의 from_jsonparse_json차이점을 설명합니다.

기능 사용 사례 가용도
from_json 스키마 진화를 통해 from_json 스키마를 유지합니다. 이 기능은 다음과 같은 경우에 유용합니다.
  • 데이터 스키마를 엄격히 적용하려고 합니다(예: 저장하기 전에 모든 스키마 변경 내용을 검토).
  • 스토리지를 최적화하고 낮은 쿼리 대기 시간 및 비용이 필요합니다.
  • 형식이 일치하지 않는 데이터로 인해 실패하는 것이 바람직합니다.
  • 손상된 JSON 레코드에서 부분 결과를 추출하고 잘못된 형식의 레코드를 열에 저장하려고 합니다 _corrupt_record . 반면 VARIANT 수집은 잘못된 JSON에 대한 오류를 반환합니다.
Lakeflow 선언적 파이프라인에서만 스키마 유추 및 진화와 함께 사용 가능
parse_json VARIANT는 스키마화할 필요가 없는 데이터를 보유하는 데 특히 적합합니다. 다음은 그 예입니다.
  • 유연하기 때문에 데이터를 반구조화 상태로 유지하려고 합니다.
  • 스키마 변경이 너무 빨라 자주 스트림 장애와 재시작이 발생하여 스키마로 캐스트하는 것이 어렵습니다.
  • 형식이 일치하지 않는 데이터에 대해 실패하지 않으려는 경우 형식 불일치가 있더라도 유효한 JSON 레코드의 경우 VARIANT 수집이 항상 성공합니다.
  • 사용자는 스키마에 맞지 않는 필드가 포함된 복구된 데이터 열을 처리하는 것을 원하지 않습니다.
Lakeflow 선언적 파이프라인이 포함된 버전과 포함되지 않은 버전으로 제공됩니다.

Lakeflow 선언적 파이프라인 외부에서 스키마 유추 및 진화 구문을 사용할 from_json 수 있나요?

아니요, Lakeflow 선언적 파이프라인 외부에서는 스키마 유추 및 진화 구문을 사용할 from_json 수 없습니다.

유추 from_json된 스키마에 액세스하려면 어떻게 해야 하나요?

대상 스트리밍 테이블의 스키마를 봅니다.

스키마를 전달 from_json 하고 진화를 수행할 수 있나요?

아니요, 스키마를 전달할 from_json 수 없으며 진화도 수행할 수 없습니다. 그러나 스키마 힌트를 제공하여 유추된 from_json필드의 일부 또는 전부를 재정의할 수 있습니다.

테이블이 완전히 새로 고쳐지면 스키마는 어떻게 되나요?

테이블과 연결된 스키마 위치가 지워지고 스키마가 처음부터 다시 유추됩니다.