Поделиться через


Операции DDL в Azure Cosmos DB для Apache Cassandra из Spark

Область применения: Кассандра

В этой статье описано пространство ключей и операции DDL таблицы с Azure Cosmos DB для Apache Cassandra из Spark.

Контекст Spark

Соединитель для API для Cassandra требует инициализации сведений о подключении Cassandra в рамках контекста Spark. При запуске записной книжки контекст Spark уже инициализирован, поэтому не рекомендуется останавливать и повторно инициализировать его. Одним из решений является добавление конфигурации API для экземпляра Cassandra на уровне кластера в конфигурацию spark кластера. Это разовое действие для каждого кластера. Добавьте в конфигурацию Spark следующий код как пару значений ключа, разделенных пробелами:

spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY

//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
spark.cassandra.output.concurrent.writes  1000  
spark.cassandra.concurrent.reads  512  
spark.cassandra.output.batch.grouping.buffer.size  1000  
spark.cassandra.connection.keep_alive_ms  600000000  
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")

Примечание.

Если вы используете Spark 3.x, вам не нужно устанавливать вспомогательный сервер Azure Cosmos DB и фабрику подключений. Также необходимо использовать remoteConnectionsPerExecutor вместо connections_per_executor_max для соединителя Spark 3 (см. выше).

Предупреждение

Примеры для Spark 3 в этой статье протестированы с использованием Spark версии 3.2.1 и соответствующего соединителя Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. Более поздние версии Spark и (или) соединителя Cassandra могут работать непредсказуемым образом.

Операции DDL для пространства ключей

Создание пространства ключей

//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)

// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))

Проверка в cqlsh

Выполните следующую команду в cqlsh, и вы увидите созданное ранее пространство ключей.

DESCRIBE keyspaces;

Удаление пространства ключей

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))

Проверка в cqlsh

DESCRIBE keyspaces;

Операции DDL для таблиц

Соображения.

  • Пропускную способность можно назначить на уровне таблицы с помощью инструкции create table.
  • Один ключ секции может хранить 20 ГБ данных.
  • Одна запись может хранить до 2 МБ данных.
  • Один диапазон ключей секций может хранить несколько ключей секции.

Создание таблицы

cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))

Проверка в cqlsh

Выполните следующую команду в cqlsh, и увидите таблицу с именем "books":

USE books_ks;
DESCRIBE books;

Подготовленные значения пропускной способности и срока жизни по умолчанию не отображаются в выходных данных предыдущей команды. Эти значения можно получить на портале.

Изменение таблицы

Следующие значения можно изменить с помощью команды alter table:

  • Подготовленная пропускная способность
  • Значение времени в реальном времени
    Изменения столбцов в настоящее время не поддерживаются.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))

Удаление таблицы

val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))

Проверка в cqlsh

Выполните следующую команду в cqlsh, и вы увидите, что таблица "books" больше не доступна:

USE books_ks;
DESCRIBE tables;

Следующие шаги

После создания пространства ключей и таблицы перейдите к следующим статьям об операциях CRUD и других аспектах: