Exercício - Criar o produtor Kafka
Agora que os clusters Kafka e Spark estão implantados, vamos adicionar um produtor de Kafka ao nó principal de Kafka. Este produtor é um estimulador do preço das ações, que produz preços artificiais das ações.
Transferir o exemplo
- No seu navegador de internet, vá para https://github.com/Azure/hdinsight-mslearn e baixe ou clone o exemplo localmente, se você ainda não o fez em um módulo anterior.
- Abra o arquivo Spark Structured Streaming\python-producer-simulator-template.py localmente.
Recuperar as URLs do corretor Kafka
Em seguida, você precisa recuperar as URLs do broker Kafka usando ssh no headnode e adicionando as URLs ao arquivo python.
Para se conectar ao nó principal principal do cluster Apache Kafka, você precisa ssh no nó. O Azure Cloud Shell no portal do Azure é a maneira recomendada de se conectar. No portal do Azure, clique no botão Azure Cloud Shell na barra de ferramentas superior e selecione Bash. Você também pode usar um prompt de comando habilitado para ssh, como o Git Bash.
Se você não tiver usado o Azure Cloud Shell antes, uma notificação informando que você não tem armazenamento montado será exibida. Selecione sua assinatura do Azure na caixa Assinatura e clique em Criar Armazenamento.
No prompt da nuvem, cole o seguinte comando. Substitua
sshuser
pelo nome de utilizador SSH. Substituakafka-mslearn-stock
pelo nome do cluster Apache Kafka e observe que você deve incluir -ssh após o nome do cluster.ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
Quando se liga pela primeira vez ao cluster, o cliente SSH pode apresentar um aviso a indicar que não é possível estabelecer a autenticidade do anfitrião. Quando lhe for pedido, escreva sim e, em seguida, prima Enter para adicionar o anfitrião à lista de servidores fidedignos do seu cliente SSH.
Quando lhe for pedido, introduza a palavra-passe do utilizador SSH.
Quando estiver ligado, verá informações semelhantes ao texto seguinte:
Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64) * Documentation: https://help.ubuntu.com * Management: https://landscape.canonical.com * Support: https://ubuntu.com/advantage * Overheard at KubeCon: "microk8s.status just blew my mind". https://microk8s.io/docs/commands#microk8s.status 0 packages can be updated. 0 updates are security updates. Welcome to Kafka on HDInsight. The programs included with the Ubuntu system are free software; the exact distribution terms for each program are described in the individual files in /usr/share/doc/*/copyright. Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by applicable law. To run a command as administrator (user "root"), use "sudo <command>". See "man sudo_root" for details.
Instale jq, um processador JSON de linha de comando. Este utilitário é usado para analisar documentos JSON e é útil na análise das informações do host. A partir da conexão SSH aberta, digite o seguinte comando para instalar
jq
:sudo apt -y install jq
Configure a variável de senha. Substitua
PASSWORD
pela senha de login do cluster e digite o comando:export password='PASSWORD'
Extraia o nome do cluster com caixa correta. O invólucro real do nome do cluster pode ser diferente do esperado, dependendo de como o cluster foi criado. Este comando obterá o invólucro real e, em seguida, armazená-lo-á em uma variável. Introduza o seguinte comando:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
Este comando não tem resposta.
Para definir uma variável de ambiente com as informações do host do Zookeeper, use o comando abaixo. O comando recupera todos os hosts do Zookeeper e, em seguida, retorna apenas as duas primeiras entradas. Isto acontece porque é desejável que exista alguma redundância para o caso de um anfitrião estar inacessível.
export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
Nota
Este comando requer acesso Ambari. Se o cluster estiver atrás de um NSG, execute este comando a partir de uma máquina que possa acessar o Ambari.
Este comando também não tem resposta.
Para verificar se a variável de ambiente está definida corretamente, utilize o comando seguinte:
echo $KAFKAZKHOSTS
Este comando devolve informações semelhantes ao texto seguinte:
zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181
Para definir uma variável de ambiente com as informações do anfitrião do mediador do Apache Kafka, utilize o comando seguinte:
export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
Nota
Este comando requer acesso Ambari. Se o cluster estiver atrás de um NSG, execute este comando a partir de uma máquina que possa acessar o Ambari.
Este comando não tem saída.
Para verificar se a variável de ambiente está definida corretamente, utilize o comando seguinte:
echo $KAFKABROKERS
Este comando devolve informações semelhantes ao texto seguinte:
wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
Copie um dos valores do broker Kafka retornados na etapa anterior para o arquivo python-producer-simulator-template.py na linha 19 e inclua aspas simples ao redor do valor, por exemplo:
kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
Salve o arquivo python-producer-simulator-template-simulator-template.py.
De volta à janela de conexão ssh, use o seguinte comando para criar um tópico.
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
Este comando se conecta ao Zookeeper usando as informações do host armazenadas no $KAFKAZKHOSTS. Em seguida, ele cria um tópico Apache Kafka chamado stockVals, para corresponder ao nome do tópico em python-producer-simulator-template.py.
Copie o arquivo python para o nó principal e execute o arquivo para transmitir dados
Em uma nova janela do git, navegue até o local do arquivo python-producer-simulator-template.py e copie o arquivo do computador local para o nó principal principal usando o comando a seguir. Substitua
kafka-mslearn-stock
pelo nome do cluster Apache Kafka e observe que você deve incluir -ssh após o nome do cluster.scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
Quando lhe for perguntado se pretende continuar a ligar-se, escreva sim. Em seguida, no prompt, digite a senha para o cluster. Após as transferências de arquivos, a seguinte saída é exibida.
python-producer-simulator-template.py 100% 1896 71.9KB/s 00:00
Agora volte para o prompt de comando do Azure, onde você recuperou as informações do broker e execute o seguinte comando para instalar o Kafka:
sudo pip install kafka-python
Depois que o Kafka é instalado com êxito, a seguinte saída é exibida.
Installing collected packages: kafka-python Successfully installed kafka-python-1.4.7
Na mesma janela, instale solicitações usando o seguinte comando:
sudo apt-get install python-requests
Quando perguntado "Após esta operação, 4.327 kB de espaço em disco adicional serão usados. Quer continuar? [S/n]" tipo y.
Quando as solicitações são instaladas com êxito, uma saída semelhante à seguinte é exibida.
Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ... Setting up python-requests (2.9.1-3ubuntu0.1) ...
Na mesma janela, use o seguinte comando para executar o arquivo python
python python-producer-simulator-template.py
Deverá ver um resultado semelhante ao seguinte:
No loops argument provided. Default loops are 1000 Running in simulated mode [ { "symbol": "MSFT", "size": 355, "price": 147.205, "time": 1578029521022 }, { "symbol": "BA", "size": 345, "price": 352.607, "time": 1578029521022 }, { "symbol": "JNJ", "size": 58, "price": 142.043, "time": 1578029521022 }, { "symbol": "F", "size": 380, "price": 8.545, "time": 1578029521022 }, { "symbol": "TSLA", "size": 442, "price": 329.342, "time": 1578029521022 }, { "symbol": "BAC", "size": 167, "price": 32.921, "time": 1578029521022 }, { "symbol": "GE", "size": 222, "price": 11.115, "time": 1578029521022 }, { "symbol": "MMM", "size": 312, "price": 174.643, "time": 1578029521022 }, { "symbol": "INTC", "size": 483, "price": 54.978, "time": 1578029521022 }, { "symbol": "WMT", "size": 387, "price": 120.355, "time": 1578029521022 } ] stockVals 2 0 stockVals 1 0 stockVals 3 0 stockVals 2 1 stockVals 7 0 stockVals 7 1 stockVals 1 1 stockVals 4 0 stockVals 4 1 stockVals 1 2
Esta saída fornece os preços de ações simulados para as ações listadas no arquivo python-producer-simulated-template.py, seguido pelo tópico, partição e deslocamento da mensagem no tópico. Você pode ver que cada vez que o produtor é acionado (a cada segundo), um novo lote de preços de ações é gerado e cada nova mensagem é adicionada a uma partição em um determinado deslocamento.