Captura de Dados de Alteração (CDA) da tabela PostgreSQL usando o Apache Flink®
Observação
Desativaremos o Microsoft Azure HDInsight no AKS em 31 de janeiro de 2025. Para evitar o encerramento abrupto das suas cargas de trabalho, você precisará migrá-las para o Microsoft Fabric ou para um produto equivalente do Azure antes de 31 de janeiro de 2025. Os clusters restantes em sua assinatura serão interrompidos e removidos do host.
Somente o suporte básico estará disponível até a data de desativação.
Importante
Esse recurso está atualmente na visualização. Os Termos de uso complementares para versões prévias do Microsoft Azure incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, confira Informações sobre a versão prévia do Azure HDInsight no AKS. No caso de perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para ver mais atualizações sobre a Comunidade do Azure HDInsight.
A Captura de Dados de Alterações (CDA) é uma técnica usada para acompanhar alterações no nível da linha em tabelas de banco de dados em resposta às operações de criação, atualização e exclusão. Nesse artigo, usamos conectores CDA para Apache Flink®, que oferecem um conjunto de conectores de origem para Apache Flink. Os conectores integram o Debezium® como o mecanismo para capturar as alterações de dados.
O Flink dá suporte para interpretar mensagens JSON e Avro do Debezium como mensagens INSERT/UPDATE/DELETE no sistema SQL do Apache Flink.
Esse suporte é útil em muitos casos para:
- Sincronizar dados incrementais de bancos de dados para outros sistemas
- Logs de auditoria
- Criar exibições materializadas em tempo real em bancos de dados
- Exibir o histórico de alteração de junção temporal de uma tabela de banco de dados
Agora, vamos aprender a monitorar as alterações na tabela PostgreSQL usando o CDA do Flink-SQL. O conector CDA do PostgreSQL permite ler dados de instantâneo e dados incrementais do banco de dados PostgreSQL.
Pré-requisitos
- Servidor flexível do PostgresSQL do Azure versão 14.7
- Cluster do Apache Flink no HDInsight no AKS
- Máquina virtual do Linux para usar o cliente PostgreSQL
- Adicione a regra NSG que permite conexões de entrada e saída na porta 5432 na sub-rede do pool do HDInsight no AKS.
Preparar tabela e Cliente do PostgreSQL
Usando uma máquina virtual Linux, instale o cliente PostgreSQL usando os comandos abaixo
sudo apt-get update sudo apt-get install postgresql-client
Instale o certificado para se conectar ao servidor PostgreSQL usando SSL
wget --no-check-certificate https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
Conecte-se ao servidor (substituir host, nome de usuário e nome de banco de dados adequadamente)
psql --host=flinkpostgres.postgres.database.azure.com --port=5432 --username=admin --dbname=postgres --set=sslmode=require --set=sslrootcert=DigiCertGlobalRootCA.crt.pem
Depois de se conectar ao banco de dados com êxito, crie uma tabela de exemplo
CREATE TABLE shipments ( shipment_id SERIAL NOT NULL PRIMARY KEY, order_id SERIAL NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ); ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001; ALTER TABLE public.shipments REPLICA IDENTITY FULL; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), (default,10002,'Hangzhou','Shanghai',false), (default,10003,'Shanghai','Hangzhou',false);
Para habilitar a CDA no banco de dados PostgreSQL, você precisará fazer as alterações a seguir.
Criar tabela CDA PostgreSQL no Apache Flink
Para criar a tabela CDA PostgreSQL do Flink, baixe todos os jars dependentes. Crie um arquivo
pom.xml
com o seguinte conteúdo.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dep.download</groupId> <artifactId>dep-download</artifactId> <version>1.0-SNAPSHOT</version> <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc --> <dependencies> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-postgres-cdc</artifactId> <version>2.4.2</version> </dependency> </dependencies> </project>
Use o comando maven para baixar todos os jars dependentes
mvn -DoutputDirectory=target -f pom.xml dependency:copy-dependencies -X
Observação
- Se o pod ssh da Web não contiver maven, siga os links para baixá-lo e instalá-lo.
- Para baixar o arquivo jar jsr, use o comando a seguir
wget https://repo1.maven.org/maven2/net/java/loci/jsr308-all/1.1.2/jsr308-all-1.1.2.jar
Depois que os jars dependentes forem baixados, inicie o cliente SQL do Flink, com esses jars a serem importados para a sessão. Conclua o comando da seguinte maneira,
/opt/flink-webssh/bin/sql-client.sh -j /opt/flink-webssh/target/flink-sql-connector-postgres-cdc-2.4.2.jar -j /opt/flink-webssh/target/slf4j-api-1.7.15.jar -j /opt/flink-webssh/target/hamcrest-2.1.jar -j /opt/flink-webssh/target/flink-shaded-guava-31.1-jre-17.0.jar-j /opt/flink-webssh/target/awaitility-4.0.1.jar -j /opt/flink-webssh/target/jsr308-all-1.1.2.jar
Esses comandos iniciam o cliente sql com as dependências como,
user@sshnode-0 [ ~ ]$ bin/sql-client.sh -j flink-sql-connector-postgres-cdc-2.4.2.jar -j slf4j-api-1.7.15.jar -j hamcrest-2.1.jar -j flink-shaded-guava-31.1-jre-17.0.jar -j awaitility-4.0.1.jar -j jsr308-all-1.1.2.jar ???????? ???????????????? ??????? ??????? ? ???? ????????? ????? ??? ??????? ????? ??? ??? ????? ?? ??????????????? ?? ? ??? ?????? ????? ????? ???? ????? ????? ??????? ??? ??????? ??? ????????? ?? ?? ?????????? ???????? ?? ? ?? ??????? ???? ??? ? ?? ???????? ????? ???? ? ?? ? ?? ???????? ???? ?? ???? ???? ?????????? ??? ?? ???? ???? ?? ??? ??????????? ???? ? ? ??? ??? ?? ??? ????????? ???? ??? ?? ? ??????? ???????? ??? ?? ??? ??? ???????????????????? ???? ? ????? ??? ?????? ???????? ???? ?? ???????? ??????????????? ?? ?? ???? ??????? ??? ?????? ?? ??? ??? ??? ??? ??????? ???? ????????????? ??? ????? ???? ?? ?? ???? ??? ?? ??? ? ?? ?? ?? ?? ?? ?? ?? ???????? ?? ????? ?? ??????????? ?? ?? ???? ? ??????? ?? ??? ????? ?? ??????????? ???? ???? ??????? ???????? ????? ?? ???? ????? ????????????????????????????????? ????? ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Command history file path: /home/xcao/.flink-sql-history Flink SQL>
Criar uma tabela CDA do PostgreSQL do Flink usando o conector CDA
CREATE TABLE shipments ( shipment_id INT, order_id INT, origin STRING, destination STRING, is_arrived BOOLEAN, PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = 'flinkpostgres.postgres.database.azure.com', 'port' = '5432', 'username' = 'username', 'password' = 'password', 'database-name' = 'postgres', 'schema-name' = 'public', 'table-name' = 'shipments', 'decoding.plugin.name' = 'pgoutput', 'slot.name' = 'flink' );
Validação
Referência
- Site do Apache Flink
- O Conector CDA do PostgreSQL é licenciado sob a licença do Apache 2.0
- Apache, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas registradas da Apache Software Foundation (ASF).