적용 대상: NoSQL
이 자습서에서는 Azure Cosmos DB Spark 커넥터를 사용하여 Cosmos DB for NoSQL 계정에서 데이터를 읽거나 씁니다. 이 자습서에서는 Azure Databricks 및 Jupyter Notebook을 사용하여 Spark에서 API for NoSQL과 통합하는 방법을 설명합니다. 이 자습서에서는 Python과 Scala에 중점을 두지만 Spark에서 지원하는 모든 언어나 인터페이스를 사용할 수 있습니다.
이 자습서에서는 다음을 하는 방법을 알아볼 수 있습니다.
- Spark 및 Jupyter Notebook을 사용하여 API for NoSQL 계정에 연결합니다.
- 데이터베이스 및 컨테이너 리소스를 만듭니다.
- 컨테이너에 데이터를 수집합니다.
- 컨테이너의 데이터를 쿼리합니다.
- 컨테이너의 항목에 대한 일반적인 작업을 수행합니다.
Prerequisites
- 기존 Azure Cosmos DB API for NoSQL 계정.
- 기존 Azure 구독이 있는 경우 새 계정을 만듭니다.
- 기존 Azure Databricks 작업 영역
Spark 및 Jupyter를 사용하여 연결
기존 Azure Databricks 작업 영역을 사용하여 Apache Spark 3.4.x를 사용하여 Azure Cosmos DB for NoSQL 계정에 연결할 준비가 된 컴퓨팅 클러스터를 만듭니다.
Azure Databricks 작업 영역을 엽니다.
작업 영역 인터페이스에서 새 클러스터를 만듭니다. 최소한 다음 설정을 사용하여 클러스터를 구성합니다.
Version Value 런타임 버전 13.3 LTS(Scala 2.12, Spark 3.4.1) 작업 영역 인터페이스를 사용하여 그룹 ID가 있는 Maven Central에서
com.azure.cosmos.spark패키지를 검색합니다. 클러스터에 접두사로 추가된azure-cosmos-spark_3-4를 사용하여 Spark 3.4용 패키지를 설치합니다.마지막으로 새 Notebook을 만듭니다.
Tip
기본적으로 Notebook은 최근 만들어진 클러스터에 연결됩니다.
Notebook 내에서 NoSQL 계정 엔드포인트, 데이터베이스 이름 및 컨테이너 이름에 대한 OLTP(온라인 트랜잭션 처리) 구성 설정을 지정합니다.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
데이터베이스 및 컨테이너 만들기
카탈로그 API를 사용하여 데이터베이스 및 컨테이너와 같은 계정 리소스를 관리합니다. 그런 다음 OLTP를 사용하여 컨테이너 리소스 내의 데이터를 관리할 수 있습니다.
Spark를 사용하여 API for NoSQL 리소스를 관리하도록 카탈로그 API를 구성합니다.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))cosmicworks를 사용하여CREATE DATABASE IF NOT EXISTS라는 새 데이터베이스를 만듭니다.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")products를 사용하여CREATE TABLE IF NOT EXISTS라는 새 컨테이너를 만듭니다. 파티션 키 경로를/category로 설정하고 초당 최대1000RU(요청 단위) 처리량을 사용하여 자동 크기 조정 처리량을 사용하도록 설정해야 합니다.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))계층적 파티션 키 구성을 사용하여
employees라는 다른 컨테이너를 만듭니다. 파티션 키 경로 집합으로/organization,/department및/team을 사용합니다. 특정 순서를 따릅니다. 또한 처리량을 수동 RU 수400로 설정합니다.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))Notebook 셀을 실행하여 데이터베이스 및 컨테이너가 API for NoSQL 계정 내에서 생성되는지 확인합니다.
데이터 수집
샘플 데이터 세트를 만듭니다. 그런 다음 OLTP를 사용하여 해당 데이터를 API for NoSQL 컨테이너에 수집합니다.
샘플 데이터 세트를 만듭니다.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )spark.createDataFrame및 이전에 저장된 OLTP 구성을 사용하여 대상 컨테이너에 샘플 데이터를 추가합니다.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
쿼리 데이터
OLTP 데이터를 데이터 프레임에 로드하여 데이터에 대한 일반적인 쿼리를 수행합니다. 다양한 구문을 사용하여 데이터를 필터링하거나 쿼리할 수 있습니다.
OLTP 데이터를 데이터 프레임 개체에 로드하려면
spark.read를 사용합니다. 이 자습서의 앞부분에서 사용한 것과 동일한 구성을 사용합니다. 또한spark.cosmos.read.inferSchema.enabled를true로 설정하여 Spark 커넥터가 기존 항목을 샘플링하여 스키마를 유추할 수 있도록 합니다.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()printSchema를 사용하여 데이터 프레임에 로드된 데이터의 스키마를 렌더링합니다.# Render schema df.printSchema()// Render schema df.printSchema()quantity열이20보다 작은 데이터 행을 렌더링합니다.where및show함수를 사용하여 이 쿼리를 수행합니다.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()clearance열이true인 첫 번째 데이터 행을 렌더링합니다.filter함수를 사용하여 이 쿼리를 수행합니다.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)필터 또는 잘림 없이 5개의 데이터 행을 렌더링합니다.
show함수를 사용하여 렌더링되는 행의 모양과 수를 사용자 지정합니다.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)다음 원시 NoSQL 쿼리 문자열을 사용하여 데이터를 쿼리합니다.
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
일반 작업 수행
Spark에서 API for NoSQL 데이터로 작업할 때 부분 업데이트를 수행하거나 데이터를 원시 JSON으로 작업할 수 있습니다.
항목의 부분 업데이트를 수행하려면 다음을 수행합니다.
기존
config구성 변수를 복사하고 새 복사본의 속성을 수정합니다. 특히 쓰기 전략을ItemPatch로 구성합니다. 그런 다음 대량 지원을 사용하지 않도록 설정합니다. 열과 매핑된 작업을 설정합니다. 마지막으로 기본 작업 유형을Set로 설정합니다.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )이 패치 작업의 일부로서 대상으로 지정할 항목 파티션 키 및 고유 식별자에 대한 변수를 만듭니다.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"패치 개체 집합을 만들어 대상 항목을 지정하고 수정해야 하는 필드를 지정합니다.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )패치 개체 집합을 사용하여 데이터 프레임을 만듭니다. 패치 작업을 수행하려면
write를 사용합니다.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()쿼리를 실행하여 패치 작업의 결과를 검토합니다. 이제 다른 변경 내용 없이 항목의 이름을
Yamba New Surfboard로 지정해야 합니다.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
원시 JSON 데이터로 작업하려면 다음을 수행합니다.
기존
config구성 변수를 복사하고 새 복사본의 속성을 수정합니다. 특히 대상 컨테이너를employees로 변경합니다. 그런 다음 원시 JSON 데이터를 사용하도록contacts열/필드를 구성합니다.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )컨테이너에 수집할 직원 집합을 만듭니다.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )데이터 프레임을 만들고
write를 사용하여 직원 데이터를 수집합니다.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()show를 사용하여 데이터 프레임에서 데이터를 렌더링합니다. 출력에서contacts열이 원시 JSON인지 확인합니다.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
관련 콘텐츠
- Apache Spark
- Azure Cosmos DB 카탈로그 API
- 구성 매개 변수 참조
- Azure Cosmos DB Spark 커넥터 샘플
- Spark 2.4에서 Spark 3으로 마이그레이션*
- 사용되지 않는 버전:
- 더 이상 사용할 수 있는 Azure Databricks, Azure Synapse 또는 Azure HDInsight에서 지원되는 Spark 3.1 또는 3.2 런타임이 없으므로 Spark 3.1 및 3.2용 Azure Cosmos DB Spark 커넥터는 더 이상 사용되지 않습니다.
- Spark 3.1에서 업데이트하는 마이그레이션 가이드
- Spark 3.2에서 업데이트하는 마이그레이션 가이드
- 버전 호환성:
- 릴리스 정보:
- 다운로드 링크: