다음을 통해 공유


Azure Synapse Link에서 Apache Spark 3을 사용하여 Azure Cosmos DB와 상호 작용

이 문서에서는 Synapse Apache Spark 3을 사용하여 Azure Cosmos DB와 상호 작용하는 방법을 알아봅니다. 고객은 Azure Cosmos DB용 Azure Synapse Link의 분석, 데이터 엔지니어링, 데이터 과학 및 데이터 탐색 시나리오에 Scala, Python, SparkSQL 및 C#을 사용할 수 있습니다.

Azure Cosmos DB와 상호 작용하는 동안 다음 기능이 지원됩니다.

  • Synapse Apache Spark 3을 사용하면 트랜잭션 워크로드의 성능에 영향을 주지 않고 Azure Synapse Link를 통해 설정된 Azure Cosmos DB 컨테이너의 데이터를 근 실시간으로 분석할 수 있습니다. 다음 두 옵션을 사용하여 Spark에서 Azure Cosmos DB 분석 저장소를 쿼리할 수 있습니다.
    • Spark 데이터 프레임에 로드
    • Spark 테이블 만들기
  • Synapse Apache Spark를 사용하면 Azure Cosmos DB로 데이터를 수집할 수도 있습니다. 데이터는 항상 트랜잭션 저장소를 통해 Azure Cosmos DB 컨테이너로 수집됩니다. Azure Synapse Link를 사용하도록 설정하면 새 삽입, 업데이트 및 삭제가 분석 저장소에 자동으로 동기화됩니다.
  • Synapse Apache Spark는 Azure Cosmos DB를 원본 및 싱크로 사용하는 Spark 구조적 스트리밍도 지원합니다.

다음 섹션에서는 구문을 안내합니다. Azure Synapse Analytics용 Apache Spark를 사용하여 Azure Cosmos DB를 쿼리하는 방법에 대한 Learn 모듈을 확인할 수도 있습니다. Azure Synapse Analytics 작업 영역의 제스처는 즉시 시작할 수 있는 간편한 환경을 제공하도록 설계되었습니다. Synapse 작업 영역의 데이터 탭에서 Azure Cosmos DB 컨테이너를 마우스 오른쪽 단추로 클릭하면 제스처가 표시됩니다. 제스처를 사용하면 코드를 빠르게 생성하고 필요에 맞게 조정할 수 있습니다. 제스처는 한 번의 클릭으로 데이터를 검색하는 데에도 적합합니다.

Important

데이터 로드 작업에서 예기치 않은 동작이 발생할 수 있는 분석 스키마의 몇 가지 제약 조건을 알고 있어야 합니다. 예를 들어 트랜잭션 스키마의 처음 1,000개 속성만 분석 스키마에서 사용할 수 있으며 공백이 있는 속성은 사용할 수 없습니다. 예기치 않은 결과가 발생하는 경우 분석 저장소 스키마 제약 조건을 확인하여 자세한 내용을 확인하세요.

Azure Cosmos DB 분석 저장소 쿼리

고객은 분석 저장소 데이터를 Spark DataFrames에 로드하거나 Spark 테이블을 만들 수 있습니다.

환경의 차이점은 Azure Cosmos DB 컨테이너의 기본 데이터 변경 내용이 Spark에서 수행되는 분석에 자동으로 반영되는지 여부와 관련이 있습니다. Spark DataFrame이 등록되거나 Spark 테이블이 만들어지면 Spark는 효율적인 푸시다운을 위해 분석 저장소 메타데이터를 가져옵니다. Spark는 지연 평가 정책을 따르므로 주의해야 합니다. Spark DataFrames 또는 SparkSQL 쿼리에서 데이터의 마지막 스냅샷을 가져오는 작업을 수행해야 합니다.

Spark 데이터 프레임 로드를 선택하는 경우 페치된 메타데이터는 Spark 세션의 수명 추기 동안 캐시되므로 데이터 프레임에서 호출된 후속 작업은 데이터 프레임 생성 시 분석 저장소의 스냅샷에 대해 평가됩니다.

반면에 Spark 테이블을 만드는 경우 분석 저장소 상태의 메타데이터는 Spark에 캐시되지 않으며 Spark 테이블에 대한 모든 SparkSQL 쿼리 실행 시 다시 로드됩니다.

결론적으로 Spark DataFrame에 스냅샷을 로드하거나 Spark 테이블에서 최신 스냅샷을 쿼리하는 중에서 선택할 수 있습니다.

Note

Azure Cosmos DB for MongoDB 계정을 쿼리하려면 분석 저장소의 완전 충실도 스키마 표현과 사용할 확장 속성 이름에 대해 자세히 알아봅니다.

Note

모든 options는 대/소문자를 구분합니다.

Authentication

이제 Spark 3.x 고객은 신뢰할 수 있는 ID 액세스 토큰 또는 데이터베이스 계정 키를 사용하여 Azure Cosmos DB 분석 저장소에 인증할 수 있습니다. 토큰은 수명이 짧기 때문에 더 안전하며 Cosmos DB RBAC를 사용하여 필요한 권한에 할당됩니다.

이제 커넥터는 MasterKey 속성에 대해 두 가지 인증 형식, AccessTokenspark.cosmos.auth.type를 지원합니다.

마스터 키 인증

키를 사용하여 spark를 사용하여 데이터 프레임을 읽습니다.

val config = Map(
    "spark.cosmos.accountEndpoint" -> "<endpoint>",
    "spark.cosmos.accountKey" -> "<key>",
    "spark.cosmos.database" -> "<db>",
    "spark.cosmos.container" -> "<container>"
)

val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)

액세스 토큰 인증

새로운 키 없는 인증은 액세스 토큰에 대한 지원을 도입합니다.

val config = Map(
    "spark.cosmos.accountEndpoint" -> "<endpoint>",
    "spark.cosmos.auth.type" -> "AccessToken",
    "spark.cosmos.auth.accessToken" -> "<accessToken>",
    "spark.cosmos.database" -> "<db>",
    "spark.cosmos.container" -> "<container>"
)

val df = spark.read.format("cosmos.olap").options(config).load()
df.show(10)

Note

Azure Cosmos DB의 Azure Synapse Link Spark 커넥터는 관리 ID를 지원하지 않습니다.

액세스 토큰 인증에는 역할 할당이 필요합니다.

액세스 토큰 접근 방식을 사용하려면 액세스 토큰을 생성해야 합니다. 액세스 토큰은 Azure ID와 연결되므로 올바른 RBAC(역할 기반 액세스 제어)를 ID에 할당해야 합니다. 역할 할당은 데이터 평면 수준에 있으며 역할 할당을 수행하려면 최소 컨트롤 플레인 권한이 있어야 합니다.

Azure Portal의 IAM(ID 액세스 관리) 역할 할당은 컨트롤 플레인 수준에 있으며 데이터 평면의 역할 할당에는 영향을 주지 않습니다. 데이터 평면 역할 할당은 Azure CLI를 통해서만 사용할 수 있습니다. 이 readAnalytics 작업은 Cosmos DB의 분석 저장소에서 데이터를 읽는 데 필요하며 미리 정의된 역할의 일부가 아닙니다. 따라서 사용자 지정 역할 정의를 만들어야 합니다. readAnalytics 동작과 함께 데이터 판독기에 필요한 동작도 추가합니다. 다음 내용을 포함한 JSON 파일을 생성하여 파일 이름을 role_definition.json으로 지정하세요.

{
  "RoleName": "CosmosAnalyticsRole",
  "Type": "CustomRole",
  "AssignableScopes": ["/"],
  "Permissions": [{
    "DataActions": [
      "Microsoft.DocumentDB/databaseAccounts/readAnalytics",
      "Microsoft.DocumentDB/databaseAccounts/readMetadata",
      "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read",
      "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/executeQuery",
      "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed"
    ]
  }]
}

액세스 토큰 인증에는 Azure CLI가 필요합니다.

  • Azure CLI에 로그인합니다. az login
  • Cosmos DB 계정이 있는 기본 구독을 설정합니다. az account set --subscription <name or id>
  • 원하는 Cosmos DB 계정에서 역할 정의를 만듭니다. az cosmosdb sql role definition create --account-name <cosmos-account-name> --resource-group <resource-group-name> --body @role_definition.json
  • 반환된 역할 definition id를 복사함: /subscriptions/<subscription-id>/resourceGroups/<resource-group-name>/providers/Microsoft.DocumentDB/databaseAccounts/< cosmos-account-name >/sqlRoleDefinitions/<a-random-generated-guid>
  • 역할을 할당하려는 아이덴티티의 프로필 ID를 확보합니다. ID는 Azure 앱 등록, 가상 머신 또는 지원되는 다른 Azure 리소스일 수 있습니다. 다음을 사용하여 주체에 역할을 할당합니다. az cosmosdb sql role assignment create --account-name "<cosmos-account-name>" --resource-group "<resource-group>" --scope "/" --principal-id "<principal-id-of-identity>" --role-definition-id "<role-definition-id-from-previous-step>"

Note

Azure 앱 등록을 사용하는 경우 서비스 주체 ID로 사용합니다 Object Id . 또한, 주체 ID와 Cosmos DB 계정은 동일한 테넌트에 있어야 합니다.

액세스 토큰 생성 - Synapse Notebooks

Synapse Notebooks에 권장되는 방법은 인증서와 함께 서비스 주체를 사용하여 액세스 토큰을 생성하는 것입니다. 자세한 내용을 보려면 여기를 클릭하세요.

The following code snippet has been validated to work in a Synapse notebook
val tenantId = "<azure-tenant-id>"
val clientId = "<client-id-of-service-principal>"
val kvLinkedService = "<azure-key-vault-linked-service>"
val certName = "<certificate-name>"
val token = mssparkutils.credentials.getSPTokenWithCertLS(
  "https://<cosmos-account-name>.documents.azure.com/.default",
  "https://login.microsoftonline.com/" + tenantId, clientId, kvLinkedService, certName)

이제 인증 유형이 액세스 토큰으로 설정된 경우 이 단계에서 생성된 액세스 토큰을 사용하여 분석 저장소에서 데이터를 읽을 수 있습니다.

Note

Azure 앱 등록을 사용하는 경우 애플리케이션(클라이언트 ID)을 사용합니다.

Note

현재 Synapse는 Notebook에서 azure-identity 패키지를 사용하여 액세스 토큰 생성을 지원하지 않습니다. 또한 synapse VHD에는 azure-identity 패키지 및 해당 종속성이 포함되지 않습니다. 자세한 내용을 보려면 여기를 클릭하세요.

Spark 데이터 프레임에 로드

이 예제에서는 Azure Cosmos DB 분석 저장소를 가리키는 Spark DataFrame을 만듭니다. 그런 다음, DataFrame에 대해 Spark 작업을 호출하여 더 많은 분석을 수행할 수 있습니다. 이 작업은 트랜잭션 저장소에 영향을 주지 않습니다.

Python의 구문은 다음과 같습니다.

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Scala의 해당 구문은 다음과 같습니다.

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Spark 테이블 만들기

이 예제에서는 Azure Cosmos DB 분석 저장소를 가리키는 Spark 테이블을 만듭니다. 그런 다음 테이블에 대해 SparkSQL 쿼리를 호출하여 더 많은 분석을 수행할 수 있습니다. 이 작업은 트랜잭션 저장소에 영향을 주지 않으며 데이터 이동이 발생하지 않습니다. 이 Spark 테이블을 삭제하려는 경우 기본 Azure Cosmos DB 컨테이너 및 해당 분석 저장소는 영향을 받지 않습니다.

이 시나리오는 타사 도구를 통해 Spark 테이블을 다시 사용하고 런타임에 대한 기본 데이터에 대한 액세스를 제공하는 데 편리합니다.

Spark 테이블을 만드는 구문은 다음과 같습니다.

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Note

기본 Azure Cosmos DB 컨테이너의 스키마가 시간에 따라 변경되는 시나리오가 있으며 업데이트된 스키마가 Spark 테이블에 대한 쿼리에 자동으로 반영되기를 원하는 경우 Spark 테이블 옵션에서 spark.cosmos.autoSchemaMerge 옵션을 true로 설정하여 목표를 달성할 수 있습니다.

Azure Cosmos DB 컨테이너에 Spark 데이터 프레임 쓰기

이 예제에서는 Azure Cosmos DB 컨테이너에 Spark DataFrame을 작성합니다. 이 작업은 트랜잭션 워크로드의 성능에 영향을 미치고 Azure Cosmos DB 컨테이너 또는 공유 데이터베이스에 프로비전된 요청 단위를 사용합니다.

Python의 구문은 다음과 같습니다.

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .mode('append')\
    .save()

Scala의 해당 구문은 다음과 같습니다.

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    mode(SaveMode.Append).
    save()

컨테이너에서 스트리밍 DataFrame 로드

이 제스처에서는 Spark 스트리밍 기능을 사용하여 컨테이너에서 데이터 프레임으로 데이터를 로드합니다. 데이터는 작업 영역에 연결한 기본 데이터 레이크 계정(및 파일 시스템)에 저장됩니다.

Note

Synapse Apache Spark에서 외부 라이브러리를 참조하려는 경우 여기에서 자세히 알아보세요. 예를 들어 Azure Cosmos DB for MongoDB 컨테이너에 Spark DataFrame을 수집하려는 경우 여기에서 Spark용 MongoDB 커넥터를 사용할 수 있습니다.

Azure Cosmos DB 컨테이너에서 스트리밍 데이터 프레임 로드

이 예제에서는 Azure Cosmos DB의 변경 피드 기능을 사용하여 Spark의 구조적 스트리밍을 사용하여 Azure Cosmos DB 컨테이너의 데이터를 Spark 스트리밍 데이터 프레임으로 로드합니다. Spark에서 사용하는 검사점 데이터는 작업 영역에 연결된 기본 데이터 레이크 계정(및 파일 시스템)에 저장됩니다.

Python의 구문은 다음과 같습니다.

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp.changeFeed")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.startFrom", "Beginning")\
    .option("spark.cosmos.changeFeed.mode", "Incremental")\
    .load()

Scala의 해당 구문은 다음과 같습니다.

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp.changeFeed").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.startFrom", "Beginning").
    option("spark.cosmos.changeFeed.mode", "Incremental").
    load()

Azure Cosmos DB 컨테이너에 스트리밍 데이터 프레임 쓰기

이 예제에서는 스트리밍 DataFrame을 Azure Cosmos DB 컨테이너에 씁니다. 이 작업은 트랜잭션 워크로드의 성능에 영향을 미치고 Azure Cosmos DB 컨테이너 또는 공유 데이터베이스에 프로비전된 요청 단위를 사용합니다. /localWriteCheckpointFolder 폴더가 만들어지지 않은 경우(아래 예제에서) 자동으로 만들어집니다.

Python의 구문은 다음과 같습니다.

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

streamQuery = dfStream\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("checkpointLocation", "/tmp/myRunId/")\
    .outputMode("append")\
    .start()

streamQuery.awaitTermination()

Scala의 해당 구문은 다음과 같습니다.

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("checkpointLocation", "/tmp/myRunId/").
            start()

query.awaitTermination()

다음 단계