Biblioteca de clientes do Pacote do JobRouter de Comunicação do Azure para Python – versão 1.0.0
Esse pacote contém um SDK do Python para Serviços de Comunicação do Azure para JobRouter. Leia mais sobre Serviços de Comunicação do Azure aqui
Código-fonte | Pacote (Pypi) | Documentação do produto
Aviso de isenção de responsabilidade
O suporte a pacotes python do SDK do Azure para Python 2.7 terminou em 01 de janeiro de 2022. Para obter mais informações e tirar dúvidas, consulte https://github.com/Azure/azure-sdk-for-python/issues/20691
Introdução
Pré-requisitos
Você precisa de uma assinatura do Azure e de um Recurso do Serviço de Comunicação para usar esse pacote.
- É necessário ter o Python 3.7 ou posterior para usar esse pacote. Para obter mais detalhes, leia nossa página na política de suporte do SDK do Azure para Python.
- Para criar um novo Serviço de Comunicação, você pode usar o Portal do Azure, o Azure PowerShell
Instalar o pacote
Instale a biblioteca de clientes do JobRouter de Comunicação do Azure para Python com pip:
pip install azure-communication-jobrouter
Principais conceitos
Trabalho
Um Trabalho representa a unidade de trabalho, que precisa ser roteada para uma Função de Trabalho disponível. Um exemplo real disso pode ser uma chamada de entrada ou chat no contexto de um call center.
Trabalho
Um Worker representa o fornecimento disponível para lidar com um Trabalho. Cada trabalhador se registra com ou mais filas para receber trabalhos. Um exemplo real disso pode ser um agente trabalhando em um call center.
Fila
Uma Fila representa uma lista ordenada de trabalhos aguardando para serem atendidos por um trabalhador. Os trabalhadores se registrarão com uma fila para receber trabalho dela. Um exemplo real disso pode ser uma fila de chamadas em um call center.
Canal
Um Canal representa um agrupamento de trabalhos por algum tipo. Quando um trabalhador se registra para receber trabalho, ele também precisa especificar de quais canais ele pode realizar trabalho e com quanto trabalho de cada um desses canais ele pode lidar simultaneamente.
Um exemplo real disso pode ser voice calls
ou chats
em um call center.
Oferta
Uma Oferta é estendida pelo JobRouter a um trabalho para lidar com um trabalho específico quando determina uma correspondência, essa notificação normalmente é entregue por meio do EventGrid. O trabalhador pode aceitar ou recusar a oferta usando a API do JobRouter ou expirará de acordo com o tempo de vida configurado na política de distribuição. Um exemplo real disso pode ser o toque de um agente em um call center.
Política de Distribuição
Uma Política de Distribuição representa um conjunto de configuração que rege como os trabalhos em uma fila são distribuídos aos trabalhadores registrados com essa fila. Essa configuração inclui quanto tempo uma Oferta é válida antes de expirar e o modo de distribuição, que define a ordem na qual os trabalhos são escolhidos quando há vários disponíveis.
Modo de Distribuição
Os três tipos de modos são
- Round Robin: os trabalhadores são ordenados por
Id
e o próximo trabalhador após o último que recebeu uma oferta é escolhido. - Ocioso por mais tempo: o trabalhador está há mais tempo sem trabalhar em nenhum trabalho.
- Melhor Trabalho: você pode especificar uma expressão para comparar dois trabalhos para determinar qual deles escolher.
Rótulos
Você pode anexar rótulos a trabalhos, trabalhos e filas. Esses são pares de valores de chave que podem ser de , number
ou boolean
tipos de string
dados.
Um exemplo real disso pode ser o nível de habilidade de um trabalho específico ou da equipe ou localização geográfica.
Seletores de rótulo
Os seletores de rótulo podem ser anexados a um trabalho para direcionar um subconjunto de trabalhos que atendem à fila. Um exemplo real disso pode ser uma condição em uma chamada de entrada que o agente deve ter um nível mínimo de conhecimento de um produto específico.
Política de classificação
Uma política de classificação pode ser usada para selecionar dinamicamente uma fila, determinar a prioridade do trabalho e anexar seletores de rótulo de trabalho a um trabalho aproveitando um mecanismo de regras.
Política de exceção
Uma política de exceção controla o comportamento de um trabalho com base em um gatilho e executa uma ação desejada. A política de exceção é anexada a uma Fila para que ela possa controlar o comportamento de Trabalhos na Fila.
Exemplos
Inicialização do cliente
Para inicializar o cliente SMS, o cadeia de conexão pode ser usado para instanciar. Como alternativa, você também pode usar a autenticação do Active Directory usando DefaultAzureCredential.
from azure.communication.jobrouter import (
JobRouterClient,
JobRouterAdministrationClient
)
connection_string = "endpoint=ENDPOINT;accessKey=KEY"
router_client = JobRouterClient.from_connection_string(conn_str = connection_string)
router_admin_client = JobRouterAdministrationClient.from_connection_string(conn_str = connection_string)
Política de Distribuição
Antes de criarmos uma Fila, precisamos de uma Política de Distribuição.
from azure.communication.jobrouter.models import (
LongestIdleMode,
DistributionPolicy
)
distribution_policy: DistributionPolicy = DistributionPolicy(
offer_expires_after_seconds = 24 * 60 * 60,
mode = LongestIdleMode(
min_concurrent_offers = 1,
max_concurrent_offers = 1
)
)
distribution_policy: DistributionPolicy = router_admin_client.upsert_distribution_policy(
"distribution-policy-1",
distribution_policy
)
Fila
Em seguida, podemos criar a fila.
from azure.communication.jobrouter.models import (
RouterQueue
)
queue: RouterQueue = RouterQueue(
distribution_policy_id = "distribution-policy-1"
)
queue: RouterQueue = router_admin_client.upsert_queue(
"queue-1",
queue
)
Trabalho
Agora, podemos enviar um trabalho diretamente para essa fila, com um seletor de trabalhador que exige que o trabalhador tenha o rótulo Some-Skill
maior que 10.
from azure.communication.jobrouter.models import (
RouterJob,
RouterWorkerSelector,
LabelOperator
)
router_job: RouterJob = RouterJob(
channel_id = "my-channel",
queue_id = "queue-1",
channel_reference = "12345",
priority = 1,
requested_worker_selectors = [
RouterWorkerSelector(key = "Some-Skill", label_operator = LabelOperator.EQUAL, value = 10)
]
)
job: RouterJob = router_client.upsert_job(
"jobId-1",
router_job
)
Trabalho
Agora, registramos um trabalho para receber trabalho dessa fila, com um rótulo igual Some-Skill
a 11.
from azure.communication.jobrouter.models import (
RouterWorker,
RouterChannel
)
router_worker: RouterWorker = RouterWorker(
capacity = 1,
queues = [
"queue-1"
],
labels = {
"Some-Skill": 11
},
channels = [
RouterChannel(channel_id = "my-channel", capacity_cost_per_job = 1)
],
available_for_offers = True
)
worker = router_client.upsert_worker(
"worker-1",
router_worker
)
Oferta
Devemos obter um RouterWorkerOfferIssued de nossa assinatura do EventGrid.
Há vários serviços diferentes do Azure que atuam como um manipulador de eventos. Para esse cenário, vamos supor webhooks para entrega de eventos. Saiba mais sobre a entrega de eventos do Webhook
Depois que os eventos forem entregues ao manipulador de eventos, poderemos desserializar o conteúdo JSON em uma lista de eventos.
# Parse the JSON payload into a list of events
from azure.eventgrid import EventGridEvent
import json
## deserialize payload into a list of typed Events
events = [EventGridEvent.from_json(json.loads(msg)) for msg in payload]
offer_id = ""
for event in events:
if event.event_type == "Microsoft.Communication.RouterWorkerOfferIssued":
offer_id = event.data.offer_id
else:
continue
No entanto, também podemos aguardar alguns segundos e, em seguida, consultar o trabalhador diretamente na API do JobRouter para ver se uma oferta foi emitida para ela.
from azure.communication.jobrouter.models import (
RouterWorker,
)
router_worker: RouterWorker = router_client.get_worker(worker_id = "worker-1")
for offer in router_worker.offers:
print(f"Worker {router_worker.id} has an active offer for job {offer.job_id}")
Aceitar uma oferta
Depois que um trabalhador recebe uma oferta, ele pode executar duas ações possíveis: aceitar ou recusar. Vamos aceitar a oferta.
from azure.communication.jobrouter.models import (
RouterJobOffer,
AcceptJobOfferResult,
RouterJobStatus
)
# fetching the offer id
job_offer: RouterJobOffer = [offer for offer in router_worker.offers if offer.job_id == "jobId-1"][0]
offer_id = job_offer.offer_id
# accepting the offer sent to `worker-1`
accept_job_offer_result: AcceptJobOfferResult = router_client.accept_job_offer(
worker_id = "worker-1",
offer_id = offer_id
)
print(f"Offer: {job_offer.offer_id} sent to worker: {router_worker.id} has been accepted")
print(f"Job has been assigned to worker: {router_worker.id} with assignment: {accept_job_offer_result.assignment_id}")
# verify job assignment is populated when querying job
updated_job = router_client.get_job(job_id = "jobId-1")
print(f"Job assignment has been successful: {updated_job.job_status == RouterJobStatus.Assigned and accept_job_offer_result.assignment_id in updated_job.assignments}")
Concluindo um trabalho
Depois que o trabalho for concluído com o trabalho, o trabalho precisará marcar o trabalho como completed
.
import datetime
from azure.communication.jobrouter.models import (
CompleteJobOptions
)
complete_job_result = router_client.complete_job(
"jobId-1",
accept_job_offer_result.assignment_id,
CompleteJobOptions(
note = f"Job has been completed by {router_worker.id} at {datetime.datetime.utcnow()}"
)
)
print(f"Job has been successfully completed.")
Fechando um trabalho
Depois que um trabalho for concluído, o trabalhador poderá executar ações de encapsulamento para o trabalho antes de fechar o trabalho e finalmente liberar sua capacidade de aceitar mais trabalhos de entrada
from azure.communication.jobrouter.models import (
RouterJob,
RouterJobStatus,
CloseJobOptions,
)
close_job_result = router_client.close_job(
"jobId-1",
accept_job_offer_result.assignment_id,
CloseJobOptions(
note = f"Job has been closed by {router_worker.id} at {datetime.datetime.utcnow()}"
)
)
print(f"Job has been successfully closed.")
update_job: RouterJob = router_client.get_job(job_id = "jobId-1")
print(f"Updated job status: {update_job.job_status == RouterJobStatus.CLOSED}")
import time
from datetime import datetime, timedelta
from azure.communication.jobrouter.models import (
RouterJob,
RouterJobStatus,
CloseJobOptions,
)
close_job_in_future_result = router_client.close_job(
"jobId-1",
accept_job_offer_result.assignment_id,
CloseJobOptions(
note = f"Job has been closed by {router_worker.id} at {datetime.utcnow()}",
close_at = datetime.utcnow() + timedelta(seconds = 2)
)
)
print(f"Job has been marked to close")
time.sleep(secs = 2)
update_job: RouterJob = router_client.get_job(job_id = "jobId-1")
print(f"Updated job status: {update_job.job_status == RouterJobStatus.CLOSED}")
Solução de problemas
Está com problemas? Esta seção deve conter detalhes sobre o que fazer lá.
Próximas etapas
Mais códigos de exemplo
Examine o diretório de exemplos para obter exemplos detalhados de como usar essa biblioteca.
Forneça comentários
Se você encontrar bugs ou tiver sugestões, registre um problema na seção Problemas do projeto
Contribuição
Este projeto aceita contribuições e sugestões. A maioria das contribuições exige que você concorde com um CLA (Contrato de Licença do Colaborador) declarando que você tem o direito de nos conceder, e de fato concede, os direitos de usar sua contribuição. Para obter detalhes, visite cla.microsoft.com.
Este projeto adotou o Código de Conduta de Software Livre da Microsoft. Para obter mais informações, confira as Perguntas frequentes sobre o Código de Conduta ou contate opencode@microsoft.com para enviar outras perguntas ou comentários.
Azure SDK for Python