다음을 통해 공유


이중 쓰기 프록시 및 Apache Spark를 사용하여 Apache Cassandra에서 Azure Cosmos DB for Apache Cassandra로 데이터를 실시간 마이그레이션

Azure Cosmos DB의 Cassandra용 API는 다음과 같은 다양한 이유로 Apache Cassandra에서 실행되는 엔터프라이즈 워크로드에 적합합니다.

  • 관리 및 모니터링의 오버헤드 없음: 운영 체제, Java 가상 머신 및 yaml 파일 및 해당 상호 작용에서 수많은 설정을 관리하고 모니터링하는 오버헤드를 제거합니다.
  • 상당한 비용 절감: 가상 머신, 대역폭 및 해당 라이선스 비용을 포함하는 Azure Cosmos DB를 사용하여 비용을 절감할 수 있습니다. 데이터 센터, 서버, SSD 스토리지, 네트워킹 및 전기 비용을 관리할 필요가 없습니다.
  • 기존 코드 및 도구 사용 가능: Azure Cosmos DB는 기존 Cassandra SDK 및 도구와 유선 프로토콜 수준의 호환성을 제공합니다. 이 호환성을 통해 Azure Cosmos DB for Apache Cassandra에서 사소한 변경으로 기존 코드베이스를 사용할 수 있습니다.

Azure Cosmos DB는 복제를 위한 네이티브 Apache Cassandra 가십 프로토콜을 지원하지 않습니다. 가동 중지 시간이 0인 경우 마이그레이션을 위한 요구 사항이므로 다른 접근 방식이 필요합니다. 이 자습서에서는 이중 쓰기 프록시Apache Spark를 사용하여 네이티브 Apache Cassandra 클러스터에서 Azure Cosmos DB for Apache Cassandra로 데이터를 실시간 마이그레이션하는 방법을 설명합니다.

다음 이미지는 이 패턴을 보여줍니다. 이중 쓰기 프록시는 라이브 변경 내용을 캡처하는 데 사용됩니다. 기록 데이터는 Apache Spark를 사용하여 대량으로 복사됩니다. 프록시가 최소한의 구성 변경으로 또는 구성 변경 없이 애플리케이션 코드의 연결을 허용할 수 있습니다. 모든 요청을 원본 데이터베이스로 라우팅하고 대량 복사가 진행되는 동안 Cassandra용 API에 쓰기를 비동기적으로 라우팅합니다.

데이터를 Azure Managed Instance for Apache Cassandra로 실시간 마이그레이션하는 방법을 보여 주는 애니메이션

필수 조건

중요합니다

마이그레이션 중에 Apache Cassandra writetime을 보존해야 하는 요구 사항이 있는 경우 테이블을 만들 때 다음 플래그를 설정해야 합니다.

with cosmosdb_cell_level_timestamp=true and cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true

예시:

CREATE KEYSPACE IF NOT EXISTS migrationkeyspace WITH REPLICATION= {'class': 'org.apache.> cassandra.locator.SimpleStrategy', 'replication_factor' : '1'};
CREATE TABLE IF NOT EXISTS migrationkeyspace.users (
 name text,
 userID int,
 address text,
 phone int,
 PRIMARY KEY ((name), userID)) with cosmosdb_cell_level_timestamp=true and > cosmosdb_cell_level_timestamp_tombstones=true and cosmosdb_cell_level_timetolive=true;

Spark 클러스터 프로비저닝

Azure Databricks를 사용하는 것이 좋습니다. Spark 3.0 이상을 지원하는 런타임을 사용합니다.

중요합니다

Azure Databricks 계정과 원본 Apache Cassandra 클러스터 간에 네트워크 연결이 설정되었는지 확인해야 합니다. 이 설정에는 가상 네트워크 주입이 필요할 수 있습니다. 자세한 내용은 Azure Virtual Network에 Azure Databricks 배포를 참조하세요.

Azure Databricks 런타임 버전을 찾는 방법을 보여 주는 스크린샷

Spark 종속성 추가

Apache Spark Cassandra 커넥터 라이브러리를 클러스터에 추가하여 네이티브 및 Azure Cosmos DB Cassandra 엔드포인트 모두에 연결합니다. 클러스터에서 라이브러리>새로 설치>Maven을 선택한 다음 Maven 좌표에 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0을 추가합니다.

중요합니다

마이그레이션하는 동안 각 행의 Apache Cassandra writetime을 보존해야 하는 요구 사항이 있는 경우 이 샘플을 사용하는 것이 좋습니다. 이 샘플의 종속성 JAR에는 Spark 커넥터도 포함되어 있으므로 앞에서 설명한 커넥터 어셈블리 대신 이 버전을 설치해야 합니다.

이 샘플은 기록 데이터 로드가 완료된 후에 원본과 대상 간에 행 비교 유효성 검사를 수행하려는 경우에도 유용합니다. 자세한 내용은 기록 데이터 로드 실행을 참조하고 원본 및 대상의 유효성을 검사합니다.

Azure Databricks에서 Maven 패키지를 검색하는 방법을 보여 주는 스크린샷

설치를 선택한 다음 설치가 완료되면 클러스터를 다시 시작합니다.

참고 항목

Cassandra 커넥터 라이브러리가 설치된 후 Azure Databricks 클러스터를 다시 시작해야 합니다.

이중 쓰기 프록시 설치

이중 쓰기 중에 최적의 성능을 위해 원본 Cassandra 클러스터의 모든 노드에 프록시를 설치하는 것이 좋습니다.

#assuming you do not have git already installed
sudo apt-get install git 

#assuming you do not have maven already installed
sudo apt install maven

#clone repo for dual-write proxy
git clone https://github.com/Azure-Samples/cassandra-proxy.git

#change directory
cd cassandra-proxy

#compile the proxy
mvn package

이중 쓰기 프록시 시작

원본 Cassandra 클러스터의 모든 노드에 프록시를 설치하는 것이 좋습니다. 최소한 다음 명령을 실행하여 각 노드에서 프록시를 시작합니다. <target-server>를 대상 클러스터에 있는 노드 중 하나의 IP 또는 서버 주소로 바꿉니다. <path to JKS file>을 로컬 .jks 파일 경로로 바꾸고, <keystore password>를 해당 암호로 바꿉니다.

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password>

이 방법으로 프록시를 시작하면 다음 조건이 충족된다고 가정합니다.

  • 원본 및 대상 엔드포인트의 사용자 이름과 암호가 동일합니다.
  • 원본 및 대상 엔드포인트가 SSL(Secure Sockets Layer)을 구현합니다.

원본 및 대상 엔드포인트가 이러한 조건을 충족할 수 없는 경우 계속해서 추가 구성 옵션을 살펴보세요.

SSL 구성

SSL의 경우 원본 클러스터에서 사용하는 것과 같은 기존 키 저장소를 구현하거나 다음을 사용하여 keytool자체 서명된 인증서를 만들 수 있습니다.

keytool -genkey -keyalg RSA -alias selfsigned -keystore keystore.jks -storepass password -validity 360 -keysize 2048

SSL을 구현하지 않는 경우 원본 또는 대상 엔드포인트에 대해 SSL을 사용하지 않도록 설정할 수도 있습니다. --disable-source-tls 또는 --disable-target-tls 플래그를 사용합니다.

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> \
  --target-password <password> --disable-source-tls true  --disable-target-tls true 

참고 항목

프록시를 통해 데이터베이스에 대한 SSL 연결을 빌드할 때 클라이언트 애플리케이션이 이중 쓰기 프록시에 사용되는 것과 동일한 키 저장소 및 암호를 사용하는지 확인합니다.

자격 증명 및 포트 구성

기본적으로 클라이언트 앱은 원본 자격 증명을 전달합니다. 프록시는 자격 증명을 사용하여 원본 및 대상 클러스터에 연결합니다. 앞에서 설명한 것처럼 이 프로세스에서는 원본 및 대상 자격 증명이 동일하다고 가정합니다. 프록시를 시작할 때 Cassandra 엔드포인트의 대상 API에 대해 다른 사용자 이름과 암호를 별도로 지정해야 합니다.

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --proxy-jks-file <path to JKS file> --proxy-jks-password <keystore password> \
  --target-username <username> --target-password <password>

지정하지 않은 경우 기본 원본 및 대상 포트는 9042입니다. 이 경우 Cassandra용 API는 포트 10350에서 실행됩니다. --source-port 또는 --target-port를 사용하여 포트 번호를 지정합니다.

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar localhost <target-server> \
  --source-port 9042 --target-port 10350 --proxy-jks-file <path to JKS file> \
  --proxy-jks-password <keystore password> --target-username <username> --target-password <password>

원격으로 프록시 배포

클러스터 노드 자체에 프록시를 설치하지 않으려는 상황이 있을 수 있습니다. 별도의 컴퓨터에 설치하는 것이 좋습니다. 이 시나리오에서 다음의 <source-server>IP 주소를 지정합니다.

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar <source-server> <destination-server>

경고

원본 Apache Cassandra 클러스터의 모든 노드에서 프록시를 실행하지 않고 별도의 컴퓨터에 원격으로 프록시를 설치하고 실행하면 실시간 마이그레이션이 발생하는 동안 성능에 영향을 줍니다. 이 구성은 기능적으로 작동하지만 클라이언트 드라이버는 클러스터 내의 모든 노드에 대한 연결을 열 수 없습니다. 클라이언트는 프록시가 설치된 단일 코디네이터 노드를 사용하여 연결합니다.

제로(0) 애플리케이션 코드 변경 허용

기본적으로 프록시는 포트 29042에서 수신 대기합니다. 이 포트를 가리키도록 애플리케이션 코드를 변경합니다. 대신 프록시가 수신 대기하는 포트를 변경할 수 있습니다. 다음을 통해 애플리케이션 수준 코드 변경을 제거하려는 경우 이 변경을 수행할 수 있습니다.

  • 원본 Cassandra 서버가 다른 포트에서 실행되도록 합니다.
  • 프록시가 표준 Cassandra 포트 9042에서 실행되도록 합니다.
java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server --proxy-port 9042

참고 항목

클러스터 노드에 프록시를 설치해도 노드를 다시 시작할 필요가 없습니다. 애플리케이션 클라이언트가 많고 애플리케이션 수준 코드 변경을 제거하기 위해 표준 Cassandra 포트 9042에서 프록시를 실행하려는 경우 Apache Cassandra 기본 포트를 변경합니다. 그런 다음 클러스터의 노드를 다시 시작하고 원본 포트를 원본 Cassandra 클러스터에 대해 정의한 새 포트로 구성해야 합니다.

다음 예제에서는 포트 3074에서 실행되도록 원본 Cassandra 클러스터를 변경하고 포트 9042에서 클러스터를 시작합니다.

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server \
 --proxy-port 9042 --source-port 3074

프로토콜 강제 적용

프록시에는 원본 엔드포인트가 대상보다 고급인 경우에 필요하고, 그러지 않으면 지원되지 않는 프로토콜 강제 적용 기능이 있습니다. 해당하는 경우 --protocol-version--cql-version을 지정하여 프로토콜이 대상을 따르도록 강제 적용할 수 있습니다.

java -jar target/cassandra-proxy-1.0-SNAPSHOT-fat.jar source-server destination-server \
  --protocol-version 4 --cql-version 3.11

이중 쓰기 프록시가 실행되면 애플리케이션 클라이언트에서 포트를 변경하고 다시 시작해야 합니다. 또는 Cassandra 포트를 변경하고 해당 방법을 선택하면 클러스터를 다시 시작합니다. 프록시는 대상 엔드포인트에 쓰기를 전달하기 시작합니다. 자세한 내용은 모니터링 및 메트릭을 참조하세요.

기록 데이터 로드 실행

데이터를 로드하려면 Azure Databricks 계정에 Scala Notebook을 만듭니다. 원본 및 대상 Cassandra 구성을 해당 자격 증명으로 바꾸고, 원본 및 대상 키스페이스와 테이블을 바꿉니다. 각 테이블에 대한 변수를 필요한 만큼 다음 샘플에 더 추가한 후 실행합니다. 애플리케이션에서 이중 쓰기 프록시로 요청을 보내기 시작하면 기록 데이터를 마이그레이션할 수 있습니다.

중요합니다

데이터를 마이그레이션하기 전에 컨테이너 처리량을 애플리케이션이 신속하게 마이그레이션하는 데 필요한 양으로 늘입니다. 마이그레이션을 시작하기 전에 처리량을 조정하면 데이터를 더 적은 시간 안에 마이그레이션할 수 있습니다. 기록 데이터 로드 중 속도 제한으로부터 보호하기 위해 Cassandra용 API에서 SSR(서버 쪽 재시도)을 사용하도록 설정할 수 있습니다. SSR을 사용하도록 설정하는 방법에 대한 지침 및 자세한 내용은 Apache Cassandra 작업에 대한 Azure Cosmos DB에 대한 속도 제한 오류 방지를 참조하세요.

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "10350",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "1",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//set timestamp to ensure it is before read job starts
val timestamp: Long = System.currentTimeMillis / 1000

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .option("writetime", timestamp)
  .mode(SaveMode.Append)
  .save

참고 항목

이전 Scala 샘플에서는 원본 테이블의 모든 데이터를 읽기 전에 timestamp가 현재 시간으로 설정되고 있음을 알 수 있습니다. writetime은 이 소급 적용된 타임스탬프로 설정됩니다. 이 방법을 사용하면 기록 데이터 로드에서 대상 엔드포인트로 작성된 레코드가 기록 데이터를 읽는 동안 이중 쓰기 프록시에서 이후 타임스탬프로 제공되는 업데이트를 덮어쓸 수 없습니다.

중요합니다

어떤 이유로든 ‘정확한’ 타임스탬프를 유지해야 하는 경우 이 샘플처럼 타임스탬프를 유지하는 기록 데이터 마이그레이션 방법을 사용해야 합니다. 샘플의 종속성 JAR에도 Spark 커넥터가 포함되어 있으므로 이전 필수 구성 요소에서 언급한 Spark 커넥터 어셈블리를 설치할 필요가 없습니다. 둘 다 Spark 클러스터에 설치하면 충돌이 발생합니다.

원본 및 대상의 유효성 검사

기록 데이터 로드가 완료되면 데이터베이스가 동기화되고 중단할 준비가 된 것입니다. 원본 및 대상의 유효성을 검사하여 최종적으로 잘라내기 전에 일치하는지 확인하는 것이 좋습니다.

참고 항목

이전에 언급한 Cassandra 마이그레이션기 샘플을 보존에 writetime사용한 경우 이 샘플에는 특정 허용 오차에 따라 원본 및 대상의 행을 비교하여마이그레이션의 유효성을 검사하는 기능이 포함되어 있습니다.

다음 단계