Миграция в Управляемый экземпляр Azure для Apache Cassandra с использованием Apache Spark
По возможности мы советуем использовать собственную репликацию Apache Cassandra для переноса данных из существующего кластера в Управляемый экземпляр Azure для Apache Cassandra, настроив гибридный кластер. При этом будет использоваться протокол Apache Cassandra для репликации данных из центра данных в новый управляемый экземпляр. Однако возможны ситуации, когда версия базы данных — источника несовместима или установка гибридного кластера нецелесообразна.
В этом учебнике описывается, как перенести данные в Управляемый экземпляр Azure для Apache Cassandra в автономном режиме с помощью соединителя Cassandra Spark и Azure Databricks для Apache Spark.
Необходимые компоненты
Подготовьте кластер Управляемого экземпляра Azure для Apache Cassandra с помощью портала Azure или Azure CLI и убедитесь в том, что вы можете подключиться к кластеру с использованием CQLSH.
Подготовьте учетную запись Azure Databricks в управляемой виртуальной сети Cassandra. Убедитесь, что она также имеет сетевой доступ к исходному кластеру Cassandra.
Убедитесь, что вы уже перенесли схему пространства ключей/таблицы из исходной базы данных Cassandra в целевую базу данных управляемого экземпляра Cassandra.
Предоставление кластера Azure Databricks
Мы рекомендуем выбрать среду выполнения Databricks версии 7.5, которая поддерживает Spark 3.0.
Добавление зависимостей
Добавьте библиотеку соединителей Apache Spark Cassandra в кластер для подключения к собственным конечным точкам, а также к конечным точкам Cassandra в Azure Cosmos DB. В кластере выберите Библиотеки>Установить>Maven, а затем добавьте com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0
в координаты Maven.
Нажмите кнопку Установить, а затем перезапустите кластер после завершения установки.
Примечание.
Убедитесь, что кластер Databricks перезапускается после установки библиотеки соединителей Cassandra.
Создание записной книжки Scala для миграции
Создайте записную книжку Scala в Databricks. Замените исходные и целевые конфигурации Cassandra соответствующими учетными данными, а также исходными и целевыми пространствами ключей и таблицами. Затем выполните следующий код.
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.SparkContext
// source cassandra configs
val sourceCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "false",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>"
)
//target cassandra configs
val targetCassandra = Map(
"spark.cassandra.connection.host" -> "<Source Cassandra Host>",
"spark.cassandra.connection.port" -> "9042",
"spark.cassandra.auth.username" -> "<USERNAME>",
"spark.cassandra.auth.password" -> "<PASSWORD>",
"spark.cassandra.connection.ssl.enabled" -> "true",
"keyspace" -> "<KEYSPACE>",
"table" -> "<TABLE>",
//throughput related settings below - tweak these depending on data volumes.
"spark.cassandra.output.batch.size.rows"-> "1",
"spark.cassandra.output.concurrent.writes" -> "1000",
"spark.cassandra.connection.remoteConnectionsPerExecutor" -> "10",
"spark.cassandra.concurrent.reads" -> "512",
"spark.cassandra.output.batch.grouping.buffer.size" -> "1000",
"spark.cassandra.connection.keep_alive_ms" -> "600000000"
)
//Read from source Cassandra
val DFfromSourceCassandra = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(sourceCassandra)
.load
//Write to target Cassandra
DFfromSourceCassandra
.write
.format("org.apache.spark.sql.cassandra")
.options(targetCassandra)
.mode(SaveMode.Append) // only required for Spark 3.x
.save
Примечание.
Если у вас есть необходимость сохранить исходную строку writetime
, обратитесь к примеру миграции cassandra.