다음을 통해 공유


Apache Spark를 사용하여 Azure Managed Instance for Apache Cassandra로 마이그레이션

가능하면 하이브리드 클러스터를 구성하여 기존 클러스터에서 Azure Managed Instance for Apache Cassandra로 데이터를 마이그레이션할 때 Apache Cassandra 네이티브 복제를 사용하는 것이 좋습니다. 이 방법에서는 Apache Cassandra의 가십 프로토콜을 사용하여 원본 데이터 센터에서 새로운 관리형 인스턴스 데이터 센터로 데이터를 복제합니다. 그러나 원본 데이터베이스 버전이 호환되지 않거나 하이브리드 클러스터를 설정할 수 없는 경우가 있을 수 있습니다.

이 자습서에서는 Cassandra Spark 커넥터 및 Azure Databricks for Apache Spark를 사용하여 오프라인 방식으로 Azure Managed Instance for Apache Cassandra로 데이터를 마이그레이션하는 방법을 설명합니다.

필수 조건

Azure Databricks 클러스터 프로비전

Spark 3.0을 지원하는 Databricks Runtime 버전 7.5를 선택하는 것이 좋습니다.

Databricks Runtime 버전을 찾는 방법을 보여 주는 스크린샷.

종속성 추가

Apache Spark Cassandra 커넥터 라이브러리를 클러스터에 추가하여 네이티브 및 Azure Cosmos DB Cassandra 엔드포인트 모두에 연결합니다. 클러스터에서 라이브러리>새로 설치>Maven을 선택한 다음 Maven 좌표에 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0을 추가합니다.

Databricks에서 Maven 패키지를 검색하는 방법을 보여 주는 스크린샷.

설치를 선택한 다음 설치가 완료되면 클러스터를 다시 시작합니다.

참고 항목

Cassandra Connector 라이브러리가 설치된 후 Databricks 클러스터를 다시 시작해야 합니다.

마이그레이션을 위한 Scala Notebook 만들기

Databricks에서 Scala Notebook을 만듭니다. 원본 및 대상 Cassandra 구성을 해당 자격 증명, 원본 및 대상 키 공간과 테이블로 바꿉니다. 다음 코드를 실행합니다.

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext

// source cassandra configs
val sourceCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "false",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>"
)

//target cassandra configs
val targetCassandra = Map( 
    "spark.cassandra.connection.host" -> "<Source Cassandra Host>",
    "spark.cassandra.connection.port" -> "9042",
    "spark.cassandra.auth.username" -> "<USERNAME>",
    "spark.cassandra.auth.password" -> "<PASSWORD>",
    "spark.cassandra.connection.ssl.enabled" -> "true",
    "keyspace" -> "<KEYSPACE>",
    "table" -> "<TABLE>",
    //throughput related settings below - tweak these depending on data volumes. 
    "spark.cassandra.output.batch.size.rows"-> "1",
    "spark.cassandra.output.concurrent.writes" -> "1000",
    "spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
    "spark.cassandra.concurrent.reads" -> "512",
    "spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
    "spark.cassandra.connection.keep_alive_ms" -> "600000000"
)

//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(sourceCassandra)
  .load
  
//Write to target Cassandra
DFfromSourceCassandra
  .write
  .format("org.apache.spark.sql.cassandra")
  .options(targetCassandra)
  .mode(SaveMode.Append) // only required for Spark 3.x
  .save

참고 항목

각 행의 원래 writetime을 보존해야 하는 경우 cassandra migrator 샘플을 참조하세요.

다음 단계