Exercício - Criar o produtor Kafka

Concluído

Agora que os clusters Kafka e Spark estão implantados, vamos adicionar um produtor de Kafka ao nó principal de Kafka. Este produtor é um estimulador do preço das ações, que produz preços artificiais das ações.

Transferir o exemplo

  1. No seu navegador de internet, vá para https://github.com/Azure/hdinsight-mslearn e baixe ou clone o exemplo localmente, se você ainda não o fez em um módulo anterior.
  2. Abra o arquivo Spark Structured Streaming\python-producer-simulator-template.py localmente.

Recuperar as URLs do corretor Kafka

Em seguida, você precisa recuperar as URLs do broker Kafka usando ssh no headnode e adicionando as URLs ao arquivo python.

  1. Para se conectar ao nó principal principal do cluster Apache Kafka, você precisa ssh no nó. O Azure Cloud Shell no portal do Azure é a maneira recomendada de se conectar. No portal do Azure, clique no botão Azure Cloud Shell na barra de ferramentas superior e selecione Bash. Você também pode usar um prompt de comando habilitado para ssh, como o Git Bash.

  2. Se você não tiver usado o Azure Cloud Shell antes, uma notificação informando que você não tem armazenamento montado será exibida. Selecione sua assinatura do Azure na caixa Assinatura e clique em Criar Armazenamento.

  3. No prompt da nuvem, cole o seguinte comando. Substitua sshuser pelo nome de utilizador SSH. Substitua kafka-mslearn-stock pelo nome do cluster Apache Kafka e observe que você deve incluir -ssh após o nome do cluster.

    ssh sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net
    
  4. Quando se liga pela primeira vez ao cluster, o cliente SSH pode apresentar um aviso a indicar que não é possível estabelecer a autenticidade do anfitrião. Quando lhe for pedido, escreva sim e, em seguida, prima Enter para adicionar o anfitrião à lista de servidores fidedignos do seu cliente SSH.

  5. Quando lhe for pedido, introduza a palavra-passe do utilizador SSH.

    Quando estiver ligado, verá informações semelhantes ao texto seguinte:

        Welcome to Ubuntu 16.04.6 LTS (GNU/Linux 4.15.0-1063-azure x86_64)
    
        * Documentation:  https://help.ubuntu.com
        * Management:     https://landscape.canonical.com
        * Support:        https://ubuntu.com/advantage
    
        * Overheard at KubeCon: "microk8s.status just blew my mind".
    
            https://microk8s.io/docs/commands#microk8s.status
    
        0 packages can be updated.
        0 updates are security updates.
    
    
    
        Welcome to Kafka on HDInsight.
    
    
        The programs included with the Ubuntu system are free software;
        the exact distribution terms for each program are described in the
        individual files in /usr/share/doc/*/copyright.
    
        Ubuntu comes with ABSOLUTELY NO WARRANTY, to the extent permitted by
        applicable law.
    
        To run a command as administrator (user "root"), use "sudo <command>".
        See "man sudo_root" for details.
    
  6. Instale jq, um processador JSON de linha de comando. Este utilitário é usado para analisar documentos JSON e é útil na análise das informações do host. A partir da conexão SSH aberta, digite o seguinte comando para instalar jq:

    sudo apt -y install jq
    
  7. Configure a variável de senha. Substitua PASSWORD pela senha de login do cluster e digite o comando:

    export password='PASSWORD'
    
  8. Extraia o nome do cluster com caixa correta. O invólucro real do nome do cluster pode ser diferente do esperado, dependendo de como o cluster foi criado. Este comando obterá o invólucro real e, em seguida, armazená-lo-á em uma variável. Introduza o seguinte comando:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Este comando não tem resposta.

  9. Para definir uma variável de ambiente com as informações do host do Zookeeper, use o comando abaixo. O comando recupera todos os hosts do Zookeeper e, em seguida, retorna apenas as duas primeiras entradas. Isto acontece porque é desejável que exista alguma redundância para o caso de um anfitrião estar inacessível.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    

    Nota

    Este comando requer acesso Ambari. Se o cluster estiver atrás de um NSG, execute este comando a partir de uma máquina que possa acessar o Ambari.

    Este comando também não tem resposta.

  10. Para verificar se a variável de ambiente está definida corretamente, utilize o comando seguinte:

    echo $KAFKAZKHOSTS
    

    Este comando devolve informações semelhantes ao texto seguinte:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

  11. Para definir uma variável de ambiente com as informações do anfitrião do mediador do Apache Kafka, utilize o comando seguinte:

    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Nota

    Este comando requer acesso Ambari. Se o cluster estiver atrás de um NSG, execute este comando a partir de uma máquina que possa acessar o Ambari.

    Este comando não tem saída.

  12. Para verificar se a variável de ambiente está definida corretamente, utilize o comando seguinte:

    echo $KAFKABROKERS
    

    Este comando devolve informações semelhantes ao texto seguinte:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

  13. Copie um dos valores do broker Kafka retornados na etapa anterior para o arquivo python-producer-simulator-template.py na linha 19 e inclua aspas simples ao redor do valor, por exemplo:

    kafkaBrokers = ['wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092']
    
  14. Salve o arquivo python-producer-simulator-template-simulator-template.py.

  15. De volta à janela de conexão ssh, use o seguinte comando para criar um tópico.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic stockVals --zookeeper $KAFKAZKHOSTS
    

Este comando se conecta ao Zookeeper usando as informações do host armazenadas no $KAFKAZKHOSTS. Em seguida, ele cria um tópico Apache Kafka chamado stockVals, para corresponder ao nome do tópico em python-producer-simulator-template.py.

Copie o arquivo python para o nó principal e execute o arquivo para transmitir dados

  1. Em uma nova janela do git, navegue até o local do arquivo python-producer-simulator-template.py e copie o arquivo do computador local para o nó principal principal usando o comando a seguir. Substitua kafka-mslearn-stock pelo nome do cluster Apache Kafka e observe que você deve incluir -ssh após o nome do cluster.

    scp python-producer-simulator-template.py sshuser@kafka-mslearn-stock-ssh.azurehdinsight.net:
    

    Quando lhe for perguntado se pretende continuar a ligar-se, escreva sim. Em seguida, no prompt, digite a senha para o cluster. Após as transferências de arquivos, a seguinte saída é exibida.

    python-producer-simulator-template.py    100% 1896    71.9KB/s   00:00
    
  2. Agora volte para o prompt de comando do Azure, onde você recuperou as informações do broker e execute o seguinte comando para instalar o Kafka:

    sudo pip install kafka-python
    

    Depois que o Kafka é instalado com êxito, a seguinte saída é exibida.

    Installing collected packages: kafka-python
    Successfully installed kafka-python-1.4.7
    
  3. Na mesma janela, instale solicitações usando o seguinte comando:

    sudo apt-get install python-requests
    
  4. Quando perguntado "Após esta operação, 4.327 kB de espaço em disco adicional serão usados. Quer continuar? [S/n]" tipo y.

    Quando as solicitações são instaladas com êxito, uma saída semelhante à seguinte é exibida.

    Setting up python-urllib3 (1.13.1-2ubuntu0.16.04.3) ...
    Setting up python-requests (2.9.1-3ubuntu0.1) ...
    
  5. Na mesma janela, use o seguinte comando para executar o arquivo python

    python python-producer-simulator-template.py
    

    Deverá ver um resultado semelhante ao seguinte:

    No loops argument provided. Default loops are 1000
    Running in simulated mode
    [
    {
        "symbol": "MSFT",
        "size": 355,
        "price": 147.205,
        "time": 1578029521022
    },
    {
        "symbol": "BA",
        "size": 345,
        "price": 352.607,
        "time": 1578029521022
    },
    {
        "symbol": "JNJ",
        "size": 58,
        "price": 142.043,
        "time": 1578029521022
    },
    {
        "symbol": "F",
        "size": 380,
        "price": 8.545,
        "time": 1578029521022
    },
    {
        "symbol": "TSLA",
        "size": 442,
        "price": 329.342,
        "time": 1578029521022
    },
    {
        "symbol": "BAC",
        "size": 167,
        "price": 32.921,
        "time": 1578029521022
    },
    {
        "symbol": "GE",
        "size": 222,
        "price": 11.115,
        "time": 1578029521022
    },
    {
        "symbol": "MMM",
        "size": 312,
        "price": 174.643,
        "time": 1578029521022
    },
    {
        "symbol": "INTC",
        "size": 483,
        "price": 54.978,
        "time": 1578029521022
    },
    {
        "symbol": "WMT",
        "size": 387,
        "price": 120.355,
        "time": 1578029521022
    }
    ]
    stockVals
    2
    0
    stockVals
    1
    0
    stockVals
    3
    0
    stockVals
    2
    1
    stockVals
    7
    0
    stockVals
    7
    1
    stockVals
    1
    1
    stockVals
    4
    0
    stockVals
    4
    1
    stockVals
    1
    2
    

Esta saída fornece os preços de ações simulados para as ações listadas no arquivo python-producer-simulated-template.py, seguido pelo tópico, partição e deslocamento da mensagem no tópico. Você pode ver que cada vez que o produtor é acionado (a cada segundo), um novo lote de preços de ações é gerado e cada nova mensagem é adicionada a uma partição em um determinado deslocamento.