Consultar Amazon Redshift utilizando Azure Databricks
Puede leer y escribir tablas de Amazon Redshift con Azure Databricks.
Importante
Las configuraciones descritas en este artículo son experimentales. Las características experimentales se proporcionan tal cual y no cuentan con soporte técnico de Databricks a través del soporte técnico al cliente. Para obtener compatibilidad completa con la federación de consultas, debe usar la federación de Lakehouse, que permite a los usuarios de Azure Databricks aprovechar la sintaxis de Unity Catalog y las herramientas de gobernanza de datos.
El origen de datos de Redshift para Databricks usa Amazon S3 para transferir datos de forma eficaz dentro y fuera de Redshift, y usa JDBC para desencadenar automáticamente los comandos COPY
y UNLOAD
adecuados en Redshift.
Nota:
En Databricks Runtime 11.3 LTS y versiones posteriores, Databricks Runtime incluye el controlador JDBC de Redshift, accesible mediante la palabra clave redshift
para la opción de formato. Consulte las notas de la versión de Databricks Runtime versiones y compatibilidad para conocer las versiones de los controladores incluidos en cada Databricks Runtime. Los controladores proporcionados por el usuario siguen siendo compatibles y tienen prioridad sobre el controlador JDBC agrupado.
En Databricks Runtime 10.4 LTS y a continuación, se requiere la instalación manual del controlador JDBC de Redshift y las consultas deben usar el controlador (com.databricks.spark.redshift
) para el formato. Vea Instalación del controlador de Redshift.
Uso
En los ejemplos siguientes se muestra cómo conectarse con el controlador de Redshift. Reemplace los valores del parámetro url
si usa el controlador JDBC de PostgreSQL.
Una vez que haya configurado las credenciales de AWS, puede usar el origen de datos con la API de origen de datos de Spark en Python, SQL, R o Scala.
Importante
Las ubicaciones externas definidas en Unity Catalog no se admiten como ubicaciones tempdir
.
Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
SQL
Leer los datos mediante SQL en Databricks Runtime 10.4 LTS y los siguientes:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Leer los datos mediante SQL en Databricks Runtime 11.3 LTS y versiones posteriores:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Escriba datos mediante SQL:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
SQL API solo admite la creación de tablas, no sobrescribir ni anexar.
R
Leer los datos con R en Databricks Runtime 10.4 LTS y los siguientes:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Leer datos usando R en Databricks Runtime 11.3 LTS y versiones posteriores:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
Scala
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Recomendaciones para trabajar con Redshift
La ejecución de consultas puede extraer grandes cantidades de datos en S3. Si tiene previsto realizar varias consultas con los mismos datos en Redshift, Databricks recomienda guardar los datos extraídos mediante Delta Lake.
Configuración
Autenticación en S3 y Redshift
El origen de datos implica varias conexiones de red, que se muestran en el diagrama siguiente:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
El origen de datos lee y escribe datos en S3 al transferir datos hacia o desde Redshift. Como resultado, requiere credenciales de AWS con acceso de lectura y escritura a un cubo S3 (especificado mediante el parámetro de configuración tempdir
).
Nota:
El origen de datos no limpia los archivos temporales que crea en S3. Como consecuencia de esto, se recomienda usar un cubo S3 temporal dedicado con una configuración del ciclo de vida de objetos para asegurarse de que los archivos temporales se eliminan automáticamente después de un periodo de expiración especificado. Vea la sección Cifrado de este documento para obtener una explicación sobre cómo cifrar estos archivos. No puede usar una ubicación externa definida en Unity Catalog como ubicación tempdir
.
En las secciones siguientes, se describen las opciones de configuración de autenticación de cada conexión:
Controlador de Spark a Redshift
El controlador de Spark se conecta a Redshift mediante JDBC con un nombre de usuario y una contraseña. Redshift no admite el uso de roles de IAM para autenticar esta conexión. De manera predeterminada, esta conexión usa el cifrado SSL; para obtener más información, vea Cifrado.
Spark a S3
S3 sirve como intermediario para almacenar datos masivos al leer o escribir en Redshift. Spark se conecta a S3 mediante las interfaces de Hadoop FileSystem y directamente mediante el cliente S3 del SDK de Amazon Java.
Nota:
No puede usar montajes DBFS para configurar el acceso a S3 para Redshift.
Establecer claves en la configuración de Hadoop: especifique las claves de AWS en las propiedades de configuración de Hadoop. Si la
tempdir
configuración apunta a uns3a://
sistema de archivos, puede establecer lasfs.s3a.access.key
propiedades yfs.s3a.secret.key
en un archivo de configuración XML de Hadoop o llamarsc.hadoopConfiguration.set()
a para configurar la configuración global de Hadoop de Spark. Si usa uns3n://
sistema de archivos, puede proporcionar las claves de configuración heredadas, como se muestra en el ejemplo siguiente.Scala
Por ejemplo, si usa el
s3a
sistema de archivos, agregue:sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
Para el sistema de archivos heredado
s3n
, añada:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
Python
Aunque el comando siguiente se basa en algunos elementos internos de Spark, debería funcionar con todas las versiones de PySpark y es poco probable que se interrumpa o se modifique en el futuro:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift a S3
Establezca la forward_spark_s3_credentials
opción para true
reenviar automáticamente las credenciales de clave de AWS que Spark usa para conectarse a S3 a través de JDBC a Redshift. La consulta JDBC incorpora estas credenciales, por lo que Databricks le recomienda encarecidamente que active el cifrado SSL de la conexión JDBC.
Cifrado
Protección de JDBC: a menos que cualquier configuración relacionada con SSL esté presente en la dirección URL de JDBC, el origen de datos habilita de manera predeterminada el cifrado SSL y también comprueba que el servidor de Redshift es de confianza (es decir,
sslmode=verify-full
). Para ello, se descarga automáticamente un certificado de servidor de los servidores de Amazon la primera vez que se necesita. En caso de que se produzca un error, se usa un archivo de certificado previamente agrupado como reserva. Esto se aplica para los controladores JDBC de Redshift y PostgreSQL.En caso de que haya algún problema con esta característica, o simplemente quiera deshabilitar SSL, puede llamar a
.option("autoenablessl", "false")
en el elementoDataFrameReader
oDataFrameWriter
.Si quiere especificar la configuración personalizada relacionada con SSL, puede seguir las instrucciones de la documentación de Redshift: Uso de certificados SSL y de servidor en Java y Opciones de configuración del controlador JDBC. Cualquier opción relacionada con SSL presente en el JDBC
url
usado con el origen de datos tiene prioridad (es decir, no se desencadenará la configuración automática).Cifrado de datos UNLOAD almacenados en S3 (datos almacenados al leer desde Redshift): según la documentación de Redshift sobre Descarga de datos en S3, "UNLOAD cifra automáticamente los archivos de datos mediante el cifrado del lado servidor de Amazon S3 (SSE-S3)".
Redshift también admite el cifrado del lado cliente con una clave personalizada (vea: Descarga de archivos de datos cifrados), pero el origen de datos carece de la capacidad para especificar la clave simétrica necesaria.
Cifrado de datos COPY almacenados en S3 (datos almacenados al escribir en Redshift): según la documentación de Redshift sobre Carga de archivos de datos cifrados desde Amazon S3:
Puede usar el comando COPY
para cargar archivos de datos que se cargaron en Amazon S3 mediante el cifrado del lado servidor con claves de cifrado administradas por AWS (SSE-S3 o SSE-KMS), el cifrado del lado cliente o ambos. COPY no admite el cifrado del lado servidor de Amazon S3 con una clave proporcionada por el cliente (SSE-C).
Parámetros
El mapa de parámetros u OPTIONS que se proporcionan en Spark SQL admiten la configuración siguiente:
Parámetro | Obligatorio | Valor predeterminado | Descripción |
---|---|---|---|
dbtable | Sí, a menos que se especifique query. | Ninguno | Tabla desde la que se va a crear o leer en Redshift. Se requiere este parámetro al guardar los datos de vuelta en Redshift. |
query | Sí, a menos que se especifique dbtable. | Ninguno | Consulta de la que se va a leer en Redshift. |
usuario | No | Ninguno | Nombre de usuario de Redshift. Se debe usar en conjunto con la opción de contraseña. Solo se puede usar si el usuario y la contraseña no se pasan en la dirección URL, si se pasan ambos se producirá un error. Use este parámetro cuando el nombre de usuario contenga caracteres especiales que deban escaparse. |
password | No | Ninguno | Contraseña de Redshift. Se debe usar en conjunto con la opción user . Solo se puede usar si el usuario y la contraseña no se pasan en la dirección URL, si se pasan ambos se producirá un error. Use este parámetro cuando la contraseña contenga caracteres especiales que deben escaparse. |
dirección url | Sí | Ninguno | Dirección URL de JDBC, con el formatojdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password> subprotocol puede ser postgresql o redshift , en función del controlador JDBC que haya cargado. Un controlador compatible con Redshift debe estar en la ruta de clase y coincidir con esta dirección URL. host y port deben apuntar al nodo maestro de Redshift, por lo que los grupos de seguridad o VPC deben configurarse para permitir el acceso desde la aplicación de controlador.database identifica un nombre user de base de datos de Redshift y password son credenciales para acceder a la base de datos, que se debe incrustar en esta dirección URL para JDBC, y la cuenta de usuario debe tener privilegios necesarios para la tabla a la que se hace referencia. |
search_path | No | Ninguno | Establezca la ruta de búsqueda de esquemas en Redshift. Se establecerá mediante el comando SET search_path to . Debe ser una lista separada por comas de nombres de esquema en la que buscar tablas. Consulte la documentación de Redshift de search_path. |
aws_iam_role | Solo si usa roles de IAM para autorizar. | Ninguno | ARN totalmente especificado del rol de operaciones COPY/UNLOAD de IAM adjunto al clúster de Redshift, por ejemplo, arn:aws:iam::123456789000:role/<redshift-iam-role> . |
forward_spark_s3_credentials | No | false |
Si es true , el origen de datos detecta automáticamente las credenciales que Spark utiliza para conectarse a S3 y reenvía esas credenciales a Redshift a través de JDBC. Estas credenciales se envían como parte de la consulta JDBC, por lo que se recomienda encarecidamente habilitar el cifrado SSL de la conexión JDBC al usar esta opción. |
temporary_aws_access_key_id | No | Ninguno | La clave de acceso de AWS debe tener permisos de escritura en el cubo S3. |
temporary_aws_secret_access_key | No | Ninguno | Clave de acceso secreta de AWS correspondiente a la clave de acceso proporcionada. |
temporary_aws_session_token | No | Ninguno | Token de sesión de AWS correspondiente a la clave de acceso proporcionada. |
tempdir | Sí | Ninguno | Una ubicación grabable en Amazon S3, que se usará para los datos descargados al leer y datos de Avro que se van a cargar en Redshift al escribir. Si usa el origen de datos de Redshift para Spark como parte de una canalización de ETL normal, puede resultar útil establecer una directiva de ciclo de vida en un cubo y usarla como ubicación temporal para estos datos. No puede usarubicaciones externas definidas en Unity Catalog como ubicaciones tempdir . |
jdbcdriver | No | Determinado por el subprotocolo de la dirección URL de JDBC. | Nombre de clase del controlador JDBC que se usará. Esta clase debe estar en la ruta de clases. En la mayoría de los casos, no debería ser necesario especificar esta opción, ya que el subprotocolo de la dirección URL de JDBC debe determinar automáticamente el nombre de clase del controlador adecuado. |
diststyle | No | EVEN |
Estilo de distribución de Redshift que se usará al crear una tabla. Puede ser uno de EVEN , KEY o ALL (consulte la documentación de Redshift). Al usar KEY , también debe establecer una clave de distribución con la opción distkey. |
distkey | No, a menos que use DISTSTYLE KEY . |
Ninguno | Nombre de una columna de la tabla que se va a usar como clave de distribución al crear una tabla. |
sortkeyspec | No | Ninguno | Una definición completa de clave de ordenación de Redshift. Algunos ejemplos son: - SORTKEY(my_sort_column) - COMPOUND SORTKEY(sort_col_1, sort_col_2) - INTERLEAVED SORTKEY(sort_col_1, sort_col_2) |
usestagingtable (en desuso) | No | true |
Establecer esta opción en desuso para false hace que la tabla de destino de una operación de sobrescritura se quite inmediatamente al principio de la escritura, lo que hace que la operación de sobrescritura no sea atómica y reduzca la disponibilidad de la tabla de destino. Esto puede reducir los requisitos de espacio en disco temporal para sobrescrituras.Dado que la configuración de la operación usestagingtable=false corre riesgos de pérdida de datos o falta de disponibilidad, está en desuso en favor de requerir que quite manualmente la tabla de destino. |
descripción | No | Ninguno | Una descripción de la tabla. Se establecerá con el comando SQL COMMENT y debe aparecer en la mayoría de las herramientas de consulta. Consulte también los metadatos description para establecer descripciones en columnas individuales. |
preactions | No | Ninguno | Lista ; separada de comandos SQL que se van a ejecutar antes de cargar el comando COPY . Puede ser útil tener algunos comandos DELETE o una ejecución similar aquí antes de cargar nuevos datos. Si el comando contiene %s , el nombre de la tabla tiene el formato antes de la ejecución (en caso de que use una tabla de almacenamiento provisional).Tenga en cuenta que si se produce un error en estos comandos, se trata como un error y se produce una excepción. Si se usa una tabla de almacenamiento provisional, se revierten los cambios y se restaura la tabla de copia de seguridad si se produce un error en las acciones previas. |
postactions | No | Ninguno | Lista ; separada de comandos SQL que se van a ejecutar después de COPY con éxito al cargar los datos. Puede ser útil tener algunos comandos GRANT o una ejecución similar aquí al cargar nuevos datos. Si el comando contiene %s , el nombre de la tabla tiene el formato antes de la ejecución (en caso de que use una tabla de almacenamiento provisional).Tenga en cuenta que si se produce un error en estos comandos, se trata como un error y se produce una excepción. Si se usa una tabla de almacenamiento provisional, se revierten los cambios y se restaura la tabla de copia de seguridad si se produce un error en las acciones posteriores. |
extracopyoptions | No | Ninguno | Lista de opciones adicionales que se van a anexar al comando COPY de Redshift al cargar datos, por ejemplo,TRUNCATECOLUMNS o MAXERROR n (consulte la documentación de Redshift para ver otras opciones).Dado que estas opciones se anexan al final del comando COPY , solo se pueden usar opciones que tengan sentido al final del comando, pero esto debe abarcar la mayoría de los casos de uso posibles. |
tempformat | No | AVRO |
Formato en el que se guardan los archivos temporales en S3 al escribir en Redshift. Tiene como valor predeterminado.AVRO ; los demás valores permitidos son CSV y CSV GZIP para CSV y CSV descomprimido, respectivamente.Redshift es significativamente más rápido al cargar CSV que al cargar archivos Avro, por lo que el uso de tempformat puede proporcionar un gran aumento del rendimiento al escribir en Redshift. |
csvnullstring | No | @NULL@ |
Valor de tipo cadena que se va a escribir para valores NULL cuando se usa el formato tempformat csv. Debe ser un valor que no aparezca en los datos reales. |
csvseparator | No | , |
Separador que se va a usar al escribir archivos temporales con tempformat establecido en CSV oCSV GZIP . Debe ser un carácter ASCII válido, por ejemplo, ", " o "\| ". |
csvignoreleadingwhitespace | No | true |
Cuando se establece en true, elimina los espacios en blanco iniciales de los valores durante las escrituras cuandotempformat se establece en CSV o en CSV GZIP . De lo contrario, se conserva el espacio en blanco. |
csvignoretrailingwhitespace | No | true |
Cuando se establece en true, elimina los espacios en blanco al final de los valores durante las escrituras cuandotempformat se establece en CSV o en CSV GZIP . De lo contrario, se conserva el espacio en blanco. |
infer_timestamp_ntz_type | No | false |
Si es true , los valores de tipo TIMESTAMP de Redshift se interpretan como TimestampNTZType (marca de tiempo sin zona horaria) durante las lecturas. De lo contrario, todas las marcas de tiempo se interpretan como TimestampType , independientemente del tipo de la tabla de Redshift subyacente. |
Opciones de configuración adicionales
Configuración del tamaño máximo de las columnas de cadena
Al crear tablas de Redshift, el comportamiento predeterminado es crear columnas TEXT
para las que son de cadena. Redshift almacena columnas TEXT
como VARCHAR(256)
, por lo que estas columnas tienen un tamaño máximo de 256 caracteres (origen).
Para admitir columnas más grandes, puede usar el campo de metadatos de columna maxlength
a fin de especificar la longitud máxima de las columnas de cadena individuales. Esto también es útil para implementar optimizaciones de rendimiento de ahorro de espacio declarando columnas con una longitud máxima menor que la predeterminada.
Nota:
Debido a las limitaciones de Spark, las API de SQL y del lenguaje R no admiten la modificación de metadatos de columna.
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Scala
Este es un ejemplo de actualización de los campos de metadatos de varias columnas mediante la API de Scala de Spark:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Establecimiento de un tipo de columna personalizado
Si necesita establecer manualmente un tipo de columna, puede usar los metadatos de columna redshift_type
. Por ejemplo, si quiere invalidar el buscador de coincidencias de tipo de Spark SQL Schema -> Redshift SQL
para asignar un tipo de columna definido por el usuario, puede hacer lo siguiente:
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
Scala
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Configuración de la codificación de columnas
Al crear una tabla, use el campo de metadatos de columna encoding
a fin de especificar una codificación de compresión para cada columna (vea la documentación de Amazon para obtener las codificaciones disponibles).
Establecimiento de descripciones en columnas
Redshift permite que las columnas tengan descripciones adjuntas que deben aparecer en la mayoría de las herramientas de consulta (mediante el comando COMMENT
). Puede establecer el campo de metadatos de columna description
a fin de especificar una descripción para columnas individuales.
Inserción de consultas en Redshift
El optimizador de Spark inserta los operadores siguientes en Redshift:
Filter
Project
Sort
Limit
Aggregation
Join
En Project
y Filter
, admite las expresiones siguientes:
- La mayoría de operadores lógicos booleanos
- Comparaciones
- Operaciones aritméticas básicas
- Conversiones numéricas y de cadena
- La mayoría de funciones de cadena
- Subconsultas escalares, si se pueden insertar completamente en Redshift.
Nota:
Esta inserción no admite expresiones que funcionen en fechas y marcas de tiempo.
En Aggregation
, admite las funciones de agregación siguientes:
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
combinadas con la cláusula DISTINCT
, si procede.
En Join
, admite los tipos de combinaciones siguientes:
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- Subconsultas que el optimizador ha reescrito en
Join
como, por ejemploWHERE EXISTS
yWHERE NOT EXISTS
Nota:
La inserción de combinación no admite FULL OUTER JOIN
.
La inserción puede ser más beneficiosa en las consultas con LIMIT
. Una consulta como SELECT * FROM large_redshift_table LIMIT 10
podría tardar mucho tiempo, ya que la tabla completa primero realizaría la acción UNLOAD en S3 como resultado intermedio. Con la inserción, LIMIT
se ejecuta en Redshift. En las consultas con agregaciones, la inserción de la agregación en Redshift también permite reducir la cantidad de datos que se deben transferir.
La inserción de consultas en Redshift está habilitada de manera predeterminada. Esta característica se puede deshabilitar si spark.databricks.redshift.pushdown
se establece en false
. Incluso cuando está deshabilitada, Spark sigue insertando filtros y realiza la eliminación de columnas en Redshift.
Instalación del controlador de Redshift
El origen de datos de Redshift también requiere un controlador JDBC compatible con Redshift. Dado que Redshift se basa en el sistema de base de datos de PostgreSQL, puede usar el controlador JDBC de PostgreSQL incluido con Databricks Runtime o el controlador JDBC de Amazon recomendado. No se requiere ninguna instalación para usar el controlador JDBC de PostgreSQL. La versión del controlador JDBC de PostgreSQL incluida en cada versión de Databricks Runtime se muestra en las notas de la versión de Databricks Runtime.
Para instalar manualmente el controlador JDBC de Redshift, haga lo siguiente:
- Descargar el controlador de Amazon.
- Cargar el controlador en el área de trabajo de Azure Databricks. Consulte Bibliotecas.
- Instalar la biblioteca en el clúster.
Nota:
Databricks recomienda usar la versión más reciente del controlador JDBC de Redshift. Las versiones del controlador JDBC de Redshift anteriores a la 1.2.41 tienen las limitaciones siguientes:
- La versión 1.2.16 del controlador devuelve datos vacíos cuando se usa una cláusula
where
en una consulta SQL. - Las versiones del controlador anteriores a la 1.2.41 pueden devolver resultados no válidos porque la nulabilidad de una columna se notifica incorrectamente como "Not Nullable" en lugar de "Unknown".
Garantías transaccionales
En esta sección se describen las garantías transaccionales del origen de datos de Redshift para Spark.
Información general sobre las propiedades de Redshift y S3
Para obtener información general sobre las garantías transaccionales de Redshift, vea el capítulo Administración de operaciones de escritura simultáneas en la documentación de Redshift. En pocas palabras, Redshift proporciona aislamiento serializable según la documentación del comando BEGIN de Redshift:
[aunque] puede usar cualquiera de los cuatro niveles de aislamiento de transacción, Amazon Redshift procesa todos los niveles de aislamiento como serializables.
Según la documentación de Redshift:
Amazon Redshift admite un comportamiento de confirmación automática predeterminado en el que cada comando SQL ejecutado de forma independiente se confirma individualmente.
Por lo tanto, los comandos individuales como COPY
y UNLOAD
son atómicos y transaccionales, mientras que los explícitos BEGIN
y END
solo deben ser necesarios para aplicar la atomicidad de varios comandos o consultas.
Al leer y escribir en Redshift, el origen de datos lee y escribe datos en S3. Tanto Spark como Redshift generan una salida con particiones y la almacenan en varios archivos en S3. Según la documentación del Modelo de coherencia de datos de Amazon S3, las operaciones de enumeración de cubos de S3 son coherentes al final, por lo que los archivos deben ir a longitudes especiales para evitar que falten datos o estén incompletos debido a este origen de coherencia final.
Garantías del origen de datos de Redshift para Spark
Anexión a una tabla existente
Al insertar filas en Redshift, el origen de datos usa el comando COPY y especifica manifiestos para protegerse contra determinadas operaciones S3 coherentes al final. Como resultado, los anexos de spark-redshift
a las tablas existentes tienen las mismas propiedades atómicas y transaccionales que los comandos COPY
normales de Redshift.
Creación de una tabla (SaveMode.CreateIfNotExists
)
La creación de una tabla es un proceso de dos pasos, que consta de un comando CREATE TABLE
seguido de otro COPY para anexar el conjunto inicial de filas. Ambas operaciones se realizan en la misma transacción.
Sobrescritura de una tabla existente
De manera predeterminada, el origen de datos usa transacciones para realizar sobrescrituras, que se implementan eliminando la tabla de destino, creando una tabla vacía y anexando filas a ella.
Si la configuración en desuso usestagingtable
se establece en false
, el origen de datos confirma el comando DELETE TABLE
antes de anexar filas a la nueva tabla, lo que sacrifica la atomicidad de la operación de sobrescritura, pero reduce la cantidad de espacio de almacenamiento provisional que Redshift necesita durante esta sobrescritura.
Consulta de tabla de Redshift
Las consultas usan el comando UNLOAD de Redshift para ejecutar una consulta, guardar sus resultados en S3 y usar manifiestos para protegerse contra determinadas operaciones de S3 coherentes al final. Como resultado, las consultas del origen de datos de Redshift para Spark deben tener las mismas propiedades de coherencia que las consultas periódicas de Redshift.
Problemas y soluciones habituales
El cubo de S3 y el clúster de Redshift se encuentran en diferentes regiones de AWS
De manera predeterminada, las copias de S3 <-> de Redshift no funcionan si el cubo de S3 y el clúster de Redshift se encuentran en regiones de AWS diferentes.
Si intenta leer una tabla de Redshift cuando el cubo de S3 está en una región diferente, es posible que vea un error como el siguiente:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
Del mismo modo, intentar escribir en Redshift mediante un cubo de S3 en una región diferente puede provocar el error siguiente:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
Escrituras: el comando COPY de Redshift admite la especificación explícita de la región del cubo de S3, por lo que puede hacer que las escrituras en Redshift funcionen correctamente en estos casos agregando
region 'the-region-name'
a la configuraciónextracopyoptions
. Por ejemplo, con un cubo en la región Este de EE. UU. (Virginia) y la API de Scala, use lo siguiente:.option("extracopyoptions", "region 'us-east-1'")
También puede usar la configuración
awsregion
:.option("awsregion", "us-east-1")
Lecturas: el comando UNLOAD de Redshift también admite la especificación explícita de la región del cubo de S3. Puede hacer que las lecturas funcionen correctamente agregando la región a la configuración
awsregion
:.option("awsregion", "us-east-1")
Error de autenticación al usar una contraseña con caracteres especiales en la dirección URL de JDBC
Si va a proporcionar el nombre de usuario y la contraseña como parte de la dirección URL de JDBC y la contraseña contiene caracteres especiales, como ;
, ?
o &
, es posible que vea la excepción siguiente:
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
Esto se debe a que los caracteres especiales del nombre de usuario o la contraseña no los ha agregado correctamente el controlador JDBC. Asegúrese de especificar el nombre de usuario y la contraseña mediante las opciones user
y password
del DataFrame correspondiente. Para obtener más información, vea Parámetros.
La consulta Spark de ejecución prolongada se bloquea indefinidamente aunque se realice la operación de Redshift correspondiente.
Si está leyendo o escribiendo grandes cantidades de datos desde y hacia Redshift, la consulta de Spark puede bloquearse indefinidamente, aunque la página Supervisión de Redshift de AWS muestre que la operación LOAD
o UNLOAD
correspondiente se ha completado y que el clúster está inactivo. Esto se debe a que la conexión entre Redshift y Spark agota el tiempo de espera. Para evitar esto, asegúrese de que la marca de JDBC tcpKeepAlive
está habilitada y TCPKeepAliveMinutes
está establecida en un valor bajo (por ejemplo, 1).
Para obtener más información, vea Configuración del controlador JDBC de Amazon Redshift.
Marca de tiempo con semántica de zona horaria
Al leer datos, los tipos de datos TIMESTAMP
y TIMESTAMPTZ
de Redshift se asignan a TimestampType
de Spark, y un valor se convierte en hora universal coordinada (UTC) y se almacena como marca de tiempo UTC. En el caso de un elemento TIMESTAMP
de Redshift, se da por hecho que la zona horaria local no tiene ninguna información de zona horaria. Al escribir datos en una tabla de Redshift, un elemento TimestampType
de Spark se asigna al tipo de datos TIMESTAMP
de Redshift.
Guía de migración
El origen de datos ahora requiere que establezca forward_spark_s3_credentials
explícitamente antes de que las credenciales de S3 de Spark se reenvíen a Redshift. Este cambio no afecta para nada si usa los mecanismos de autenticación aws_iam_role
o temporary_aws_*
. Pero si se basa en el comportamiento predeterminado anterior, ahora debe establecer explícitamente forward_spark_s3_credentials
en true
para seguir usando el mecanismo de autenticación anterior de Redshift en S3. Para obtener una explicación de los tres mecanismos de autenticación y sus ventajas y desventajas de seguridad, vea la sección Autenticación en S3 y Redshift de este documento.