Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Importante
Esta página inclui instruções para gerenciar componentes das Operações do Azure IoT usando manifestos de implantação do Kubernetes, que estão em versão prévia. Esse recurso é fornecido com várias limitações e não deve ser usado para cargas de trabalho de produção.
Veja os Termos de Uso Complementares para Versões Prévias do Microsoft Azure para obter termos legais que se aplicam aos recursos do Azure que estão em versão beta, versão preliminar ou que, de outra forma, ainda não foram lançados em disponibilidade geral.
Para configurar a comunicação bidirecional entre as Operações do Azure IoT e os agentes do Apache Kafka, você pode configurar um ponto de extremidade do fluxo de dados. Essa configuração permite especificar o ponto de extremidade, o protocolo TLS, a autenticação e outras configurações.
Pré-requisitos
- Uma instância das Operações do Azure IoT
Hubs de Eventos do Azure
Os Hubs de Eventos do Azure são compatíveis com o protocolo do Kafka e podem ser usados com fluxos de dados com algumas limitações.
Criar um namespace e um hub de eventos dos Hubs de Eventos do Azure
Primeiro, crie um namespace dos Hubs de Eventos do Azure habilitado para Kafka
Próximo, crie um hub de eventos no namespace. Cada hub de eventos individual corresponde a um tópico do Kafka. Você pode criar vários hubs de eventos no mesmo namespace para representar vários tópicos do Kafka.
Atribuir permissão à identidade gerenciada
Para configurar um ponto de extremidade do fluxo de dados para os Hubs de Eventos do Azure, recomendamos usar uma identidade gerenciada atribuída pelo usuário ou atribuída pelo sistema. Essa abordagem é segura e elimina a necessidade de gerenciar credenciais manualmente.
Depois que o namespace e o hub de eventos dos Hubs de Eventos do Azure forem criados, você precisará atribuir uma função à identidade gerenciada das Operações IoT do Azure que concede permissão para enviar ou receber mensagens para o hub de eventos.
Se estiver usando a identidade gerenciada atribuída pelo sistema, no portal do Azure, vá para sua instância das Operações do Azure IoT e selecione Visão geral. Copie o nome da extensão listada após Extensão do Arc de Operações do Azure IoT. Por exemplo, azure-iot-operations-xxxx7. Sua identidade gerenciada atribuída pelo sistema pode ser encontrada usando o mesmo nome da extensão Operações do Azure IoT para Arc.
Em seguida, acesse o namespace dos Hubs de Eventos >Controle de acesso (IAM)>Adicionar atribuição de função.
- Na guia Função , selecione uma função apropriada como
Azure Event Hubs Data SenderouAzure Event Hubs Data Receiver. Isso fornece à identidade gerenciada as permissões necessárias para enviar ou receber mensagens para todos os hubs de eventos no namespace. Para saber mais, veja Autenticar um aplicativo com o Microsoft Entra ID para acessar recursos dos Hubs de Eventos. - Na guia Membros:
- Se estiver usando a identidade gerenciada atribuída pelo sistema, em Atribuir acesso a, selecione a opção Usuário, grupo ou entidade de serviço, selecione + Selecionar membros e pesquise o nome da extensão Operações do Azure IoT para Arc.
- Se estiver usando a identidade gerenciada atribuída pelo usuário, em Atribuir acesso a, selecione a opção Identidade Gerenciada e selecione + Selecionar membros e pesquise identidade gerenciada atribuída pelo usuário configurada para conexões de nuvem.
Criar um ponto de extremidade do fluxo de dados para os Hubs de Eventos do Azure
Depois que o namespace e o hub de eventos dos Hubs de Eventos do Azure forem configurados, você poderá criar um endpoint de fluxo de dados para o namespace dos Hubs de Eventos do Azure habilitado para uso com Kafka.
Na experiência de operações, selecione a guia Pontos de extremidade do fluxo de dados.
Em Criar novo ponto de extremidade de fluxo de dados, selecione Hubs de Eventos do Azure>Novo.
Insira as seguintes configurações para o ponto de extremidade:
Configuração Descrição Nome O nome do ponto de extremidade do fluxo de dados. Anfitrião O nome do host dos Hubs de Eventos. Você pode pesquisar um host existente dos Hubs de Eventos ou inserir o nome do host manualmente usando o formato <NAMESPACE>.servicebus.windows.net.Porto A porta do host dos Hubs de Eventos. Para Hubs de Eventos, a porta é 9093.Método de autenticação O método usado para autenticação. Recomendamos que você escolha identidade gerenciada atribuída pelo sistema ou identidade gerenciada atribuída pelo usuário. Selecione Aplicar para provisionar o ponto de extremidade.
Observação
O tópico do Kafka, ou um hub de eventos individual, é configurado posteriormente quando você cria o fluxo de dados. O tópico do Kafka é o destino das mensagens de fluxo de dados.
Usar uma cadeia de conexão para autenticação nos Hubs de Eventos
Importante
Para usar a interface da Web da experiência de operações para gerenciar segredos, primeiro, as Operações do Azure IoT precisam ser habilitadas com as configurações seguras por meio da definição de um Azure Key Vault e da habilitação de identidades de carga de trabalho. Para saber mais, veja Habilitar as configurações seguras na implantação das Operações do Azure IoT.
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Informações Básicas e escolha Método de autenticação>SASL.
Insira as seguintes configurações para o ponto de extremidade:
| Configuração | Descrição |
|---|---|
| Tipo de SASL | Escolha Plain. |
| Nome do segredo sincronizado | Insira o nome do segredo do Kubernetes que contém a cadeia de conexão. |
| Referência de nome de usuário ou segredo do token | A referência ao nome de usuário ou ao segredo do token usado para a autenticação SASL. Escolha-o na lista do Key Vault ou crie um. O valor precisa ser $ConnectionString. |
| Referência de senha do segredo do token | A referência à senha ou ao segredo do token usado para a autenticação SASL. Escolha-o na lista do Key Vault ou crie um. O valor deve estar no formato de Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>. |
Depois de selecionar Adicionar referência, se você selecionar Criar, insira as seguintes configurações:
| Configuração | Descrição |
|---|---|
| Nome do segredo | O nome do segredo no Azure Key Vault. Escolha um nome que seja fácil de lembrar para selecionar o segredo posteriormente na lista. |
| Valor do segredo | Para o nome de usuário, insira $ConnectionString. Para a senha, insira a cadeia de conexão no formato Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY-NAME>;SharedAccessKey=<KEY>. |
| Definir a data de ativação | Se ativada, a data em que o segredo se torna ativo. |
| Definir a data de validade | Se ativado, a data em que o segredo expira. |
Para saber mais sobre segredos, veja Criar e gerenciar segredos nas Operações do Azure IoT.
Limitações
Os Hubs de Eventos do Azure não dão suporte a todos os tipos de compactação aos quais o Kafka dá suporte. No momento, só há suporte para a compactação GZIP em camadas premium e dedicadas dos Hubs de Eventos do Azure. O uso de outros tipos de compactação pode resultar em erros.
Agentes personalizados do Kafka
Para configurar um ponto de extremidade do fluxo de dados para agentes do Kafka que não sejam do Hub de Eventos, defina o host, o TLS, a autenticação e outras configurações conforme necessário.
Na experiência de operações, selecione a guia Pontos de extremidade do fluxo de dados.
Em Criar novo ponto de extremidade do fluxo de dados, selecione Agente Kafka personalizado>Novo.
Insira as seguintes configurações para o ponto de extremidade:
Configuração Descrição Nome O nome do ponto de extremidade do fluxo de dados. Anfitrião O nome do host do agente Kafka no formato <Kafka-broker-host>:xxxx. Inclua o número da porta na configuração do host.Método de autenticação O método usado para autenticação. Escolha SASL. Tipo de SASL O tipo de autenticação SASL. Selecione Simples, ScramSha256 ou ScramSha512. Necessário se estiver usando SASL. Nome do segredo sincronizado O nome do segredo. Necessário se estiver usando SASL. Referência de nome de usuário do segredo do token A referência ao nome de usuário no segredo do token do SASL. Necessário se estiver usando SASL. Selecione Aplicar para provisionar o ponto de extremidade.
Observação
Atualmente, a experiência de operações não dá suporte ao uso de um ponto de extremidade do fluxo de dados do Kafka como fonte. É possível criar um fluxo de dados com um ponto de extremidade do fluxo de dados do Kafka como fonte usando o Kubernetes ou o Bicep.
Para personalizar as configurações do ponto de extremidade, use as seções a seguir para obter mais informações.
Métodos de autenticação disponíveis
Os métodos de autenticação a seguir estão disponíveis para pontos de extremidade do fluxo de dados do agente do Kafka.
Identidade gerenciada atribuída pelo sistema
Antes de configurar o ponto de extremidade do fluxo de dados, atribua uma função à identidade gerenciada das Operações do Azure IoT que concede permissão para se conectar ao agente do Kafka:
- No portal do Azure, acesse sua instância das Operações do Azure IoT e selecione Visão geral.
- Copie o nome da extensão listada após Extensão do Arc de Operações do Azure IoT. Por exemplo, azure-iot-operations-xxxx7.
- Acesse o recurso de nuvem que você precisa para conceder permissões. Por exemplo, vá para o namespace dos Hubs de Eventos >Controle de acesso (IAM)>Adicionar atribuição de função.
- Na guia Função , selecione uma função apropriada.
- Na guia Membros, para Atribuir acesso a, selecione Usuário, grupo ou opção de entidade de serviço e selecione + Selecionar membros e pesquise a identidade gerenciada das Operações do Azure IoT. Por exemplo, azure-iot-operations-xxxx7.
Em seguida, defina o ponto de extremidade do fluxo de dados com as configurações da identidade gerenciada atribuída pelo sistema.
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Informações Básicas e escolha Método de autenticação>Identidade gerenciada atribuída pelo sistema.
Essa configuração cria uma identidade gerenciada com o público-alvo padrão, que é o mesmo que o valor do host do namespace dos Hubs de Eventos, na forma de https://<NAMESPACE>.servicebus.windows.net. No entanto, caso precise substituir o público-alvo padrão, defina o campo audience como o valor desejado.
Não há suporte na experiência de operações.
Identidade gerenciada atribuída pelo usuário
Para usar a identidade gerenciada atribuída pelo usuário para autenticação, você deve primeiro implantar o Operações do Azure IoT com as configurações seguras habilitadas. Em seguida, você precisa configurar uma identidade gerenciada atribuída pelo usuário para conexões em nuvem. Para saber mais, veja Habilitar as configurações seguras na implantação das Operações do Azure IoT.
Antes de configurar o ponto de extremidade do fluxo de dados, atribua uma função à identidade gerenciada atribuída pelo usuário que concede permissão para se conectar ao agente do Kafka:
- No portal do Azure, vá para o recurso de nuvem que você precisa para conceder permissões. Por exemplo, acesse o namespace da Grade de Eventos >Controle de acesso (IAM)>Adicionar atribuição de função.
- Na guia Função , selecione uma função apropriada.
- Na guia Membros, para Atribuir acesso a, selecione a opção Identidade Gerenciada e selecione + Selecionar membros e pesquise sua identidade gerenciada atribuída pelo usuário.
Em seguida, defina o ponto de extremidade do fluxo de dados com as configurações de identidade gerenciada atribuída pelo usuário.
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Informações Básicas e escolha Método de autenticação>Identidade gerenciada atribuída pelo usuário.
Aqui, o escopo é o público-alvo da identidade gerenciada. O valor padrão é o mesmo que o valor do host do namespace do Hubs de Eventos do Azure no formato de https://<NAMESPACE>.servicebus.windows.net. No entanto, se você precisar substituir o público-alvo padrão, defina o campo de escopo para o valor desejado usando o Bicep ou o Kubernetes.
SASL
Para usar o SASL para autenticação, especifique o método de autenticação SASL e configure o tipo SASL e uma referência secreta com o nome do segredo que contém o token SASL.
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Informações Básicas e escolha Método de autenticação>SASL.
Insira as seguintes configurações para o ponto de extremidade:
| Configuração | Descrição |
|---|---|
| Tipo de SASL | O tipo de autenticação SASL a ser usada. Os tipos com suporte são Plain, ScramSha256, e ScramSha512. |
| Nome do segredo sincronizado | O nome do segredo do Kubernetes que contém o token de SASL. |
| Referência de nome de usuário ou segredo do token | A referência ao nome de usuário ou ao segredo do token usado para a autenticação SASL. |
| Referência de senha do segredo do token | A referência à senha ou ao segredo do token usado para a autenticação SASL. |
Os tipos de SASL com suporte são:
PlainScramSha256ScramSha512
O segredo precisa estar no mesmo namespace do ponto de extremidade do fluxo de dados do Kafka. O segredo precisa ter o token de SASL como um par chave-valor.
Anônima
Para usar a autenticação anônima, atualize a seção de autenticação das configurações do Kafka para usar o método Anônimo.
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Informações Básicas e escolha Método de autenticação>Nenhum.
Configurações avançadas
É possível definir configurações avançadas para o ponto de extremidade do fluxo de dados do Kafka, como TLS, Certificado de Autoridade de Certificação Confiável, configurações de mensagens do Kafka, envio em lote e CloudEvents. Você pode definir essas configurações na guia do portal do ponto de extremidade do fluxo de dados Avançado ou no recurso do ponto de extremidade do fluxo de dados.
Na experiência de operações, selecione a guia Avançado para o ponto de extremidade do fluxo de dados.
Configurações do protocolo TLS
Modo TLS
Para habilitar ou desabilitar o TLS para o ponto de extremidade do Kafka, atualize a configuração mode nas configurações do TLS.
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use a caixa de seleção ao lado de Modo TLS habilitado.
O modo TLS pode ser definido como Enabled ou Disabled. Se o modo estiver definido como Enabled, o fluxo de dados usará uma conexão segura com o agente do Kafka. Se o modo estiver definido como Disabled, o fluxo de dados usará uma conexão não segura com o agente do Kafka.
Certificado de Autoridade de Certificação confiável
Configure o certificado de Autoridade de Certificação confiável para o ponto de extremidade do Kafka para estabelecer uma conexão segura com o agente do Kafka. Essa configuração é importante se o agente do Kafka usa um certificado autoassinado ou um certificado assinado por uma AC personalizada que não é confiável por padrão.
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo Configuração de certificado de autoridade de certificação confiável para especificar o ConfigMap que contém o certificado de autoridade de certificação confiável.
Esse ConfigMap deve conter o Certificado de Autoridade de Certificação no formato PEM. O ConfigMap precisa estar no mesmo namespace que o recurso de fluxo de dados do Kafka. Por exemplo:
kubectl create configmap client-ca-configmap --from-file root_ca.crt -n azure-iot-operations
Dica
Ao se conectar aos Hubs de Eventos do Azure, o certificado de Autoridade de Certificação não é necessário porque o serviço Hubs de Eventos usa um certificado assinado por uma AC pública confiável por padrão.
ID do grupo de consumidores
A ID do grupo de consumidores é usada para identificar o grupo de consumidores que o fluxo de dados usa para fazer a leitura das mensagens do tópico do Kafka. A ID do grupo de consumidores deve ser exclusiva no agente Kafka.
Importante
Quando o ponto de extremidade do Kafka é usado como origem, o ID do grupo de consumidores é necessário. Caso contrário, o fluxo de dados não poderá ler mensagens do tópico do Kafka, e você receberá um erro “Os pontos de extremidade de origem do tipo Kafka precisam ter um consumerGroupId definido”.
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo ID do grupo de consumidores para especificar a ID do grupo de consumidores.
Essa configuração só entrará em vigor se o ponto de extremidade for usado como uma fonte (ou seja, o fluxo de dados é um consumidor).
Compactação
O campo de compactação permite a compactação para as mensagens enviadas aos tópicos do Kafka. A compactação ajuda a reduzir a largura de banda da rede e o espaço de armazenamento necessários para a transferência de dados. No entanto, a compactação também adiciona alguma sobrecarga e latência ao processo. Os tipos de compactação com suporte estão listados na tabela a seguir.
| Valor | Descrição |
|---|---|
None |
Nenhuma compactação ou lote é aplicado. Nenhum será o valor padrão se nenhuma compactação for especificada. |
Gzip |
A compactação GZIP e o processamento em lote são aplicados. GZIP é um algoritmo de compressão de uso geral que oferece um bom equilíbrio entre taxa de compressão e velocidade. No momento, só há suporte para a compactação GZIP em camadas premium e dedicadas dos Hubs de Eventos do Azure. |
Snappy |
A compactação rápida e o processamento em lote são aplicados. Snappy é um algoritmo de compressão rápida que oferece taxa de compressão moderada e velocidade. Não há suporte para esse modo de compactação nos Hubs de Eventos do Azure. |
Lz4 |
A compactação LZ4 e o processamento em lote são aplicados. LZ4 é um algoritmo de compressão rápida que oferece baixa taxa de compressão e alta velocidade. Não há suporte para esse modo de compactação nos Hubs de Eventos do Azure. |
Para configurar a compactação:
Na página de configurações do ponto de extremidade do fluxo de dados de experiência de operações, selecione a guia Avançado e use o campo Compactação para especificar o tipo de compactação.
Essa configuração entrará em vigor somente se o ponto de extremidade for usado como um destino em que o fluxo de dados é um produtor.
Envio em lote
Além da compactação, você também pode configurar o envio em lote de mensagens antes de enviá-los para os tópicos do Kafka. O envio em lote permite agrupar várias mensagens e compactá-las como uma só unidade, o que pode aprimorar a eficiência da compactação e reduzir a sobrecarga de rede.
| Campo | Descrição | Obrigatório |
|---|---|---|
mode |
Pode ser Enabled ou Disabled. O valor padrão é Enabled porque o Kafka não tem a noção de mensagens sem lotes. Se definido como Disabled, o processamento em lote será minimizado para criar um lote com uma só mensagem por vez. |
Não |
latencyMs |
O intervalo de tempo máximo em milissegundos em que as mensagens podem ser armazenadas em buffer antes de serem enviadas. Se esse intervalo for atingido, todas as mensagens em buffer serão enviadas como um lote, independentemente de quantas ou quão grandes elas sejam. Se não for definido, o valor padrão será 5. | Não |
maxMessages |
O número máximo de mensagens que podem ser armazenadas em buffer antes de serem enviadas. Se esse número for atingido, todas as mensagens armazenadas em buffer serão enviadas como um lote, independentemente do tamanho ou da duração do armazenamento em buffer. Se não for definido, o valor padrão será 100.000. | Não |
maxBytes |
O tamanho máximo em bytes que pode ser armazenado em buffer antes de ser enviado. Se esse tamanho for atingido, todas as mensagens armazenadas em buffer serão enviadas como um lote, independentemente de quantas ou por quanto tempo elas permanecerão armazenadas em buffer. O valor padrão é 1.000.000 (1 MB). | Não |
Por exemplo, se você definir latencyMs como 1000, maxMessages como 100 e maxBytes como 1024, as mensagens serão enviadas quando houver 100 mensagens no buffer, ou quando houver 1.024 bytes no buffer, ou quando se passarem 1.000 milissegundos desde o último envio, o que ocorrer primeiro.
Para configurar o envio em lote:
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo Habilitado para envio em lote para habilitar o envio em lote. Use os campos Latência do envio em lote, Máximo de bytes e Contagem de mensagens para especificar as configurações do envio em lote.
Essa configuração entrará em vigor somente se o ponto de extremidade for usado como um destino em que o fluxo de dados é um produtor.
Estratégia de processamento de partições
A estratégia de tratamento de partição controla como as mensagens são atribuídas às partições do Kafka ao enviá-las para tópicos do Kafka. As partições do Kafka são segmentos lógicos de um tópico do Kafka que permitem processamento paralelo e tolerância a falhas. Cada mensagem em um tópico do Kafka tem uma partição e um deslocamento que são usados para identificar e ordenar as mensagens.
Essa configuração entrará em vigor somente se o ponto de extremidade for usado como um destino em que o fluxo de dados é um produtor.
Por padrão, um fluxo de dados atribui mensagens a partições aleatórias, usando um algoritmo round-robin. Porém, você pode usar estratégias diferentes para atribuir mensagens a partições com base em alguns critérios, como o nome do tópico MQTT ou uma propriedade de mensagem MQTT. Isso pode ajudar você a obter melhor balanceamento de carga, localidade de dados ou ordenação de mensagens.
| Valor | Descrição |
|---|---|
Default |
Atribui mensagens a partições aleatórias, usando um algoritmo round-robin. Este será o valor padrão se nenhuma estratégia for especificada. |
Static |
Atribui mensagens a um número de partição fixo derivado da ID do fluxo de dados. Isso significa que cada instância do fluxo de dados envia mensagens para uma partição diferente. Isso pode ajudar a obter melhor balanceamento de carga e localidade de dados. |
Topic |
Usa o nome do tópico MQTT da fonte de fluxo de dados como a chave para particionamento. Isso significa que as mensagens com o mesmo nome de tópico MQTT são enviadas para a mesma partição. Isso pode ajudar a obter uma melhor ordenação de mensagens e localidade de dados. |
Property |
Usa uma propriedade de mensagem MQTT da fonte de fluxo de dados como a chave para particionamento. Especifique o nome da propriedade no campo partitionKeyProperty. Isso significa que as mensagens com o mesmo valor de propriedade são enviadas para a mesma partição. Isso pode ajudar a obter uma melhor ordenação de mensagens e localidade de dados com base em um critério personalizado. |
Por exemplo, se você definir a estratégia de processamento de partições como Property e a propriedade da chave de partição como device-id, as mensagens com a mesma propriedade device-id serão enviadas para a mesma partição.
Para configurar a estratégia de processamento de partição:
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo Estratégia de processamento de partições para especificar a estratégia de processamento de partições. Use o campo Propriedade da chave de partição para especificar a propriedade usada para particionamento se a estratégia estiver definida como Property.
Reconhecimentos do Kafka
Os reconhecimentos do Kafka (acks) são usados para controlar a durabilidade e a consistência das mensagens enviadas aos tópicos do Kafka. Quando um produtor envia uma mensagem para um tópico do Kafka, ele pode solicitar diferentes níveis de confirmações do agente do Kafka para garantir que a mensagem seja gravada com sucesso no tópico e replicada no cluster do Kafka.
Essa configuração só entrará em vigor se o ponto de extremidade for usado como um destino (ou seja, o fluxo de dados é um produtor).
| Valor | Descrição |
|---|---|
None |
O fluxo de dados não aguarda as confirmações do agente do Kafka. Essa configuração é a opção mais rápida, mas menos durável. |
All |
O fluxo de dados aguarda que a mensagem seja gravada na partição de líder e em todas as partições de seguidor. Essa configuração é a opção mais lenta, porém mais durável. Essa configuração também é a opção padrão |
One |
O fluxo de dados aguarda que a mensagem seja gravada na partição de líder e, pelo menos, uma partição de seguidor. |
Zero |
O fluxo de dados aguarda que a mensagem seja gravada na partição líder, mas não espera por nenhuma confirmação dos seguidores. Isso é mais rápido do que One, mas menos durável. |
Por exemplo, se você definir a confirmação do Kafka como All, o fluxo de dados aguardará que a mensagem seja gravada na partição líder e em todas as partições seguidoras antes de enviar a próxima mensagem.
Para configurar as confirmações do Kafka:
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo Confirmação do Kafka para especificar o nível de confirmação do Kafka.
Essa configuração só entra em vigor se o ponto de extremidade for usado como um destino o local em que o fluxo de dados é um produtor.
Copiar as propriedades do MQTT
Por padrão, a configuração de propriedades do MQTT de cópia está habilitada. Essas propriedades do usuário incluem valores como subject, que armazena o nome do ativo que está enviando a mensagem.
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e, em seguida, use a caixa de seleção ao lado de copiar propriedades MQTT campo para habilitar ou desabilitar a cópia das propriedades do MQTT.
As seções a seguir descrevem como as propriedades do MQTT são traduzidas para cabeçalhos de usuário do Kafka e vice-versa quando a configuração está habilitada.
O ponto de extremidade do Kafka é um destino
Quando um ponto de extremidade do Kafka é um destino de fluxo de dados, todas as propriedades definidas pela especificação MQTT v5 são cabeçalhos de usuário do Kafka convertidos. Por exemplo, uma mensagem do MQTT v5 com “Tipo de Conteúdo” sendo encaminhada ao Kafka é convertida no cabeçalho do usuário"Content Type":{specifiedValue} do Kafka. Regras semelhantes se aplicam a outras propriedades do MQTT internas, definidas na tabela a seguir.
| Propriedade do MQTT | Comportamento traduzido |
|---|---|
| Indicador de formato de conteúdo | Chave: "Indicador de Formato de Conteúdo" Valor: “0” (o conteúdo é bytes) ou “1” (o conteúdo é UTF-8) |
| Tópico de resposta | Chave: "Tópico de Resposta" Valor: Cópia do Tópico de Resposta da mensagem original. |
| Intervalo de Expiração da Mensagem | Chave: "Intervalo de Expiração da Mensagem" Valor: representação UTF-8 de número de segundos antes da mensagem expirar. Consulte a propriedade de Intervalo de Expiração de Mensagem para obter mais detalhes. |
| Dados de correlação: | Chave: "Dados de Correlação" Valor: cópia dos Dados de Correlação da mensagem original. Ao contrário de muitas propriedades do MQTT v5 codificadas em UTF-8, os dados de correlação podem ser dados arbitrários. |
| Tipo de conteúdo: | Chave: “Tipo de Conteúdo” Valor: cópia do Tipo de Conteúdo da mensagem original. |
Os pares de valor da chave de propriedade do usuário do MQTT v5 são traduzidos diretamente para cabeçalhos de usuário do Kafka. Se um cabeçalho de usuário em uma mensagem tiver o mesmo nome de uma propriedade do MQTT interna (por exemplo, um cabeçalho de usuário chamado “Dados de Correlação”), então, se o encaminhamento do valor da propriedade de especificação MQTT v5 ou a propriedade do usuário será indefinido.
Os fluxos de dados nunca recebem essas propriedades de um Agente MQTT. Assim, um fluxo de dados nunca os encaminha:
- Alias do Tópico
- Identificadores de assinatura
A propriedade de Intervalo de Expiração da Mensagem
O Intervalo de Expiração da Mensagem especifica por quanto tempo uma mensagem pode permanecer em um Agente MQTT antes de ser descartada.
Quando um fluxo de dados recebe uma mensagem MQTT com o Intervalo de Expiração da Mensagem especificado, ele:
- Registra a hora em que a mensagem foi recebida.
- Antes que a mensagem seja emitida para o destino, o tempo é subtraído da mensagem que foi colocada na fila do tempo de intervalo de expiração original.
- Se a mensagem não tiver expirado (a operação acima é > 0), a mensagem será emitida para o destino e conterá o Tempo de Expiração da Mensagem atualizado.
- Se a mensagem tiver expirado (a operação acima é <= 0), então, a mensagem não será emitida pelo Destino.
Exemplos:
- Um fluxo de dados recebe uma mensagem do MQTT com um Intervalo de Expiração da Mensagem = 3.600 segundos. O destino correspondente está temporariamente desconectado, mas pode se reconectar. 1.000 segundos passam antes que essa mensagem MQTT seja enviada para o Destino. Neste caso, a mensagem do destino tem seu Intervalo de Expiração de Mensagem definido como 2.600 (3.600 a 1.000) segundos.
- O fluxo de dados recebe uma mensagem MQTT com Intervalo de Expiração de Mensagem = 3.600 segundos. O destino correspondente está temporariamente desconectado, mas pode se reconectar. Neste caso, no entanto, ele leva 4.000 segundos para se reconectar. A mensagem expirou e o fluxo de dados não encaminha essa mensagem para o destino.
O ponto de extremidade do Kafka é uma fonte de fluxo de dados
Observação
Há um problema conhecido ao usar o ponto de extremidade dos Hubs de Eventos como uma fonte de fluxo de dados em que o cabeçalho do Kafka é corrompido como convertido para MQTT. Isso só acontecerá caso esteja usando os Hubs de Eventos no entanto, o cliente dos Hubs de Eventos que usa o AMQP sob as coberturas. Por exemplo, “foo”=”bar”, “foo” é convertido, mas o valor passa a ser “\xa1\x03bar”.
Quando um ponto de extremidade do Kafka é uma fonte de fluxo de dados, os cabeçalhos de usuário do Kafka são convertidos para propriedades do MQTT v5. A tabela a seguir descreve como os cabeçalhos de usuário do Kafka são convertidos para propriedades do MQTT v5.
| Cabeçalho do Kafka | Comportamento traduzido |
|---|---|
| Chave | Chave: "Chave" Valor: cópia da Chave da mensagem original. |
| Carimbo de data/hora | Chave: "Carimbo de data/hora" Valor: codificação UTF-8 do Carimbo de Data/Hora do Kafka, que é o número de milissegundos desde a época do UNIX. |
Os pares chave/valor do cabeçalho do usuário do Kafka, desde que estejam todos codificados em UTF-8, são traduzidos diretamente em propriedades de chave/valor do usuário do MQTT.
UTF-8/Incompatibilidades binárias
O MQTT v5 só pode dar suporte a propriedades baseadas em UTF-8. Se o fluxo de dados receber uma mensagem do Kafka que contenha um ou mais cabeçalhos não UTF-8, o fluxo de dados será:
- Remova as propriedades inválidas.
- Encaminhe o restante da mensagem, seguindo as regras anteriores.
Os aplicativos que exigem a transferência binária em cabeçalhos de origem do Kafka => propriedades de destino do MQTT precisam primeiro codificá-los em UTF-8, por exemplo, por meio do Base64.
Incompatibilidades da propriedade >=64KB
As propriedades do MQTT v5 precisam ser menores que 64 KB. Se o fluxo de dados receber uma mensagem do Kafka que contenha um ou mais cabeçalhos que seja >= 64 KB, o fluxo de dados será:
- Remova as propriedades inválidas.
- Encaminhe o restante da mensagem, seguindo as regras anteriores.
Conversão de propriedades ao usar os Hubs de Eventos e os produtores que usam o AMQP
Caso você tenha um cliente que encaminha mensagens para um ponto de extremidade de fonte de fluxo de dados do Kafka executando uma das seguintes ações:
- Enviar mensagens para os Hubs de Eventos usando bibliotecas de clientes como Azure.Messaging.EventHubs
- Usando o AMQP diretamente
Há nuances de tradução de propriedade para estar ciente.
Você deverá fazer um dos seguintes procedimentos:
- Evitar o envio de propriedades
- Caso precise enviar propriedades, envie valores codificados como UTF-8.
Quando os Hubs de Eventos convertem propriedades do AMQP para o Kafka, ele inclui os tipos codificados do AMQP subjacentes em sua mensagem. Para obter mais informações sobre o comportamento, veja Troca de eventos entre consumidores e produtores usando protocolos diferentes.
No exemplo de código a seguir, quando o ponto de extremidade do fluxo de dados recebe o valor "foo":"bar", ele recebe a propriedade como <0xA1 0x03 "bar">.
using global::Azure.Messaging.EventHubs;
using global::Azure.Messaging.EventHubs.Producer;
var propertyEventBody = new BinaryData("payload");
var propertyEventData = new EventData(propertyEventBody)
{
Properties =
{
{"foo", "bar"},
}
};
var propertyEventAdded = eventBatch.TryAdd(propertyEventData);
await producerClient.SendAsync(eventBatch);
O ponto de extremidade de fluxo de dados não pode encaminhar a propriedade de conteúdo <0xA1 0x03 "bar"> para uma mensagem do MQTT porque os dados não são UTF-8. No entanto, se você especificar uma cadeia de caracteres UTF-8, o ponto de extremidade do fluxo de dados converterá a cadeia de caracteres antes de enviá-la ao MQTT. Se você usar uma cadeia de caracteres UTF-8, a mensagem MQTT terá "foo":"bar" como propriedades do usuário.
Somente os cabeçalhos UTF-8 são traduzidos. Por exemplo, dado o seguinte cenário em que a propriedade é definida como um float:
Properties =
{
{"float-value", 11.9 },
}
O ponto de extremidade do fluxo de dados descarta pacotes que contêm o campo "float-value".
Nem todas as propriedades de dados de eventos, incluindo propertyEventData.correlationId, são encaminhadas. Para obter mais informações, veja Propriedades do usuário do evento.
CloudEvents
CloudEvents são uma maneira de descrever dados de evento de uma maneira comum. As configurações do CloudEvents são usadas para enviar ou receber mensagens no formato do CloudEvents. Use os CloudEvents para arquiteturas orientadas por eventos em que diferentes serviços precisam se comunicar entre si nos mesmos provedores de nuvem ou em provedores de nuvem diferentes.
As opções de CloudEventAttributes são Propagate ou CreateOrRemap.
Na página de configurações do ponto de extremidade do fluxo de dados da experiência de operações, selecione a guia Avançado e use o campo Atributos do evento em nuvem para especificar a configuração do CloudEvents.
As seções a seguir descrevem como as propriedades do CloudEvent são propagadas ou criadas e remapeadas.
Propagar configuração
As propriedades do CloudEvent são passadas para mensagens que contêm as propriedades necessárias. Se a mensagem não contiver as propriedades necessárias, a mensagem será passada como está. Se as propriedades obrigatórias estiverem presentes, um prefixo ce_ será adicionado ao nome da propriedade do CloudEvent.
| Nome | Obrigatório | Valor de exemplo | Nome de saída | Valor de saída |
|---|---|---|---|---|
specversion |
Sim | 1.0 |
ce-specversion |
Passado como está |
type |
Sim | ms.aio.telemetry |
ce-type |
Passado como está |
source |
Sim | aio://mycluster/myoven |
ce-source |
Passado como está |
id |
Sim | A234-1234-1234 |
ce-id |
Passado como está |
subject |
Não | aio/myoven/sensor/temperature |
ce-subject |
Passado como está |
time |
Não | 2018-04-05T17:31:00Z |
ce-time |
Passado como está. Não é carimbado novamente. |
datacontenttype |
Não | application/json |
ce-datacontenttype |
Alterado para o tipo de conteúdo de dados de saída após o estágio de transformação opcional. |
dataschema |
Não | sr://fabrikam-schemas/123123123234234234234234#1.0.0 |
ce-dataschema |
Se um esquema de transformação de dados de saída for fornecido na configuração de transformação, dataschema será alterado para o esquema de saída. |
Configuração CreateOrRemap
As propriedades do CloudEvent são passadas para mensagens que contêm as propriedades necessárias. Se a mensagem não contiver as propriedades necessárias, as propriedades serão geradas.
| Nome | Obrigatório | Nome de saída | Valor gerado se ausente |
|---|---|---|---|
specversion |
Sim | ce-specversion |
1.0 |
type |
Sim | ce-type |
ms.aio-dataflow.telemetry |
source |
Sim | ce-source |
aio://<target-name> |
id |
Sim | ce-id |
UUID gerado no cliente de destino |
subject |
Não | ce-subject |
O tópico de saída para onde a mensagem é enviada |
time |
Não | ce-time |
Gerado como RFC 3339 no cliente de destino |
datacontenttype |
Não | ce-datacontenttype |
Alterado para o tipo de conteúdo de dados de saída após o estágio de transformação opcional |
dataschema |
Não | ce-dataschema |
Esquema definido no registro de esquema |
Próximas etapas
Para saber mais sobre fluxos de dados, consulte Criar um fluxo de dados.