Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este artigo descreve o modelo de programação assíncrona no SDK do Azure para Java.
Inicialmente, o SDK do Azure continha apenas APIs assíncronas sem bloqueio para interagir com os serviços do Azure. Essas APIs permitem que você use o SDK do Azure para criar aplicativos escalonáveis que usam recursos do sistema com eficiência. No entanto, o SDK do Azure para Java também contém clientes síncronos para atender a um público mais amplo e também tornar nossas bibliotecas de clientes acessíveis para usuários que não estão familiarizados com programação assíncrona. (Consulte Approachable nas diretrizes de design do SDK do Azure.) Dessa forma, todas as bibliotecas Java cliente no SDK do Azure para Java oferecem clientes assíncronos e síncronos. No entanto, é recomendável usar os clientes assíncronos para sistemas de produção para maximizar o uso de recursos do sistema.
Fluxos reativos
Se você examinar a seção Clientes de serviço assíncrono nas Diretrizes de design do SDK do Azure para Java, observará que, em vez de usar CompletableFuture fornecido pelo Java 8, nossas APIs assíncronas usarão tipos reativos. Por que escolhemos tipos reativos em vez de tipos que estão disponíveis nativamente no JDK?
O Java 8 introduziu recursos como Streams, Lambdas e CompletableFuture. Essas funcionalidades fornecem muitas possibilidades, mas têm algumas limitações.
CompletableFuture fornece recursos baseados em retorno de chamada, sem bloqueio, e a interface CompletionStage permitiu a fácil composição de uma série de operações assíncronas. Lambdas tornam essas APIs baseadas em push mais legíveis. Os fluxos fornecem operações de estilo funcional para lidar com uma coleção de elementos de dados. No entanto, os fluxos são síncronos e não podem ser reutilizados.
CompletableFuture permite que você faça uma única solicitação, fornece suporte para um retorno de chamada e espera uma única resposta. No entanto, muitos serviços de nuvem exigem a capacidade de transmitir dados – Hubs de Eventos, por exemplo.
Fluxos reativos podem ajudar a superar essas limitações transmitindo elementos de uma origem para um assinante. Quando um assinante solicita dados de uma fonte, a fonte envia qualquer número de resultados de volta. Ele não precisa enviá-los todos de uma vez. A transferência ocorre durante um período de tempo, sempre que a fonte tem dados a serem enviados.
Nesse modelo, o assinante registra manipuladores de eventos para processar dados quando eles chegam. Essas interações baseadas em push notificam o assinante por meio de sinais distintos:
- Uma
onSubscribe()chamada indica que a transferência de dados está prestes a começar. - Uma
onError()chamada indica que houve um erro, que também marca o fim da transferência de dados. - Uma
onComplete()chamada indica a conclusão bem-sucedida da transferência de dados.
Ao contrário dos Fluxos de Java, os fluxos reativos tratam os erros como eventos de primeira classe. Fluxos reativos têm um canal dedicado para a origem para comunicar todos os erros ao assinante. Além disso, os fluxos reativos permitem que o assinante negocie a taxa de transferência de dados para transformar esses fluxos em um modelo de push-pull.
A especificação de Fluxos Reativos fornece um padrão de como a transferência de dados deve ocorrer. Em um alto nível, a especificação define as quatro interfaces a seguir e especifica regras sobre como essas interfaces devem ser implementadas.
- O Publisher é a fonte de um fluxo de dados.
- O assinante é o consumidor de um fluxo de dados.
- A assinatura gerencia o estado da transferência de dados entre um publicador e um assinante.
- Processor é tanto editor quanto assinante.
Há algumas bibliotecas Java conhecidas que fornecem implementações dessa especificação, como RxJava, Akka Streams, Vert.x e Project Reactor.
O SDK do Azure para Java adotou o Project Reactor para oferecer suas APIs assíncronas. O principal fator que conduziu essa decisão foi proporcionar uma integração suave com a Spring Webflux, que também usa o Project Reactor. Outro fator contribuinte para escolher o Project Reactor em vez de RxJava foi que o Project Reactor usa Java 8, mas rxJava, na época, ainda estava no Java 7. O Project Reactor também oferece um conjunto avançado de operadores componíveis que permitem que você escreva código declarativo para construir pipelines de processamento de dados. Outra coisa interessante sobre o Project Reactor é que ele tem adaptadores para converter tipos do Project Reactor em outros tipos de implementação populares.
Comparando APIs de operações síncronas e assíncronas
Discutimos os clientes síncronos e as opções para clientes assíncronos. A tabela a seguir resume a aparência das APIs que foram projetadas usando estas opções:
| Tipo de API | Sem valor | Valor único | Vários valores |
|---|---|---|---|
| Java Padrão – APIs síncronas | void |
T |
Iterable<T> |
| Java Padrão – APIs assíncronas | CompletableFuture<Void> |
CompletableFuture<T> |
CompletableFuture<List<T>> |
| Interfaces de fluxos reativos | Publisher<Void> |
Publisher<T> |
Publisher<T> |
| Implementação da Project Reactor de fluxos reativos | Mono<Void> |
Mono<T> |
Flux<T> |
Para fins de integridade, vale a pena mencionar que o Java 9 introduziu a classe Flow que inclui as quatro interfaces de fluxos reativos. No entanto, essa classe não inclui nenhuma implementação.
Usar APIs assíncronas no SDK do Azure para Java
A especificação de fluxos reativos não diferencia os tipos de publicadores. Na especificação de fluxos reativos, os editores simplesmente produzem zero ou mais elementos de dados. Em muitos casos, há uma distinção útil entre um editor que produz no máximo um elemento de dados versus um que produz zero ou mais. Em APIs baseadas em nuvem, essa distinção indica se uma solicitação retorna uma resposta com valor único ou uma coleção. O Project Reactor fornece dois tipos para fazer essa distinção : Mono e Flux. Uma API que retorna um Mono conterá uma resposta com no máximo um valor, e uma API que retorna um Flux conterá uma resposta com zero ou mais valores.
Por exemplo, suponha que você use um ConfigurationAsyncClient para recuperar uma configuração armazenada usando o serviço de Configuração de Aplicativos do Azure. (Para obter mais informações, consulte o que é a Configuração de Aplicativos do Azure?.)
Se você criar um ConfigurationAsyncClient e chamar getConfigurationSetting() no cliente, ele retornará um Mono, que indica que a resposta contém um único valor. No entanto, chamar esse método sozinho não faz nada. O cliente ainda não fez uma solicitação para o serviço de Configuração de Aplicativos do Azure. Nessa fase, o Mono<ConfigurationSetting> retornado por essa API é apenas um "assembly" do pipeline de processamento de dados. O que isso significa é que a configuração necessária para consumir os dados está concluída. Para efetivamente iniciar a transferência de dados (ou seja, para fazer a solicitação ao serviço e obter a resposta), você deve se inscrever no elemento retornado Mono. Portanto, ao lidar com esses fluxos reativos, você deve se lembrar de chamar subscribe() porque nada acontece até que você faça isso.
O exemplo a seguir mostra como se inscrever em Mono e imprimir o valor de configuração no console.
ConfigurationAsyncClient asyncClient = new ConfigurationClientBuilder()
.connectionString("<your connection string>")
.buildAsyncClient();
asyncClient.getConfigurationSetting("<your config key>", "<your config value>").subscribe(
config -> System.out.println("Config value: " + config.getValue()),
ex -> System.out.println("Error getting configuration: " + ex.getMessage()),
() -> System.out.println("Successfully retrieved configuration setting"));
System.out.println("Done");
Observe que, depois de chamar getConfigurationSetting() no cliente, o código de exemplo assina o resultado e fornece três lambdas separados. O primeiro lambda consome dados recebidos do serviço, que são disparados após uma resposta bem-sucedida. O segundo lambda será disparado se houver um erro ao recuperar a configuração. O terceiro lambda é invocado quando o fluxo de dados é concluído, o que significa que não são esperados mais elementos de dados desse fluxo.
Observação
Assim como acontece com toda a programação assíncrona, depois que a assinatura é criada, a execução continua normalmente. Se não houver nada para manter o programa ativo e em execução, ele poderá ser encerrado antes da conclusão da operação assíncrona. O thread principal chamado subscribe() não aguardará até que você faça a chamada de rede para a Configuração de Aplicativos do Azure e receba uma resposta. Em sistemas de produção, você pode continuar a processar outra coisa, mas neste exemplo você pode adicionar um pequeno atraso chamando Thread.sleep() ou usando uma CountDownLatch para dar à operação assíncrona uma chance de ser concluída.
Conforme mostrado no exemplo a seguir, as APIs que retornam um Flux também seguem um padrão semelhante. A diferença é que o primeiro callback fornecido ao método subscribe() é chamado várias vezes para cada elemento de dados na resposta. O erro ou os retornos de chamada de conclusão são chamados exatamente uma vez e são considerados como sinais de terminal. Nenhum outro retorno de chamada será invocado se um desses sinais for recebido do editor.
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(
event -> System.out.println("Sequence number of received event: " + event.getData().getSequenceNumber()),
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
Contrapressão
O que acontece quando a origem está produzindo os dados a uma taxa mais rápida do que o assinante pode lidar? O assinante pode sofrer uma sobrecarga de dados, o que pode levar a erros de memória insuficiente. O assinante precisa de uma forma de se comunicar de volta com o editor para reduzir o ritmo quando não conseguir acompanhar. Por padrão, quando você chama subscribe() em um Flux conforme mostrado no exemplo acima, o assinante está solicitando um fluxo ilimitado de dados, indicando ao publicador para enviar os dados o mais rápido possível. Esse comportamento nem sempre é desejável, e o assinante pode precisar controlar a taxa de publicação por meio de "contrapressão". A contrapressão permite que o assinante assuma o controle do fluxo de elementos de dados. Um assinante solicitará um número limitado de elementos de dados que ele pode manipular. Depois que o assinante concluir o processamento desses elementos, o assinante poderá solicitar mais. Ao usar a contrapressão, você pode transformar um modelo push para transferência de dados em um modelo de push-pull.
O exemplo a seguir mostra como você pode controlar a taxa na qual os eventos são recebidos pelo consumidor dos Hubs de Eventos:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.request(1); // request another event when the subscriber is ready
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Quando o assinante "se conecta" pela primeira vez ao publicador, o publicador entrega ao assinante uma Subscription instância, que gerencia o estado da transferência de dados. Esse Subscription é o meio pelo qual o assinante pode aplicar contrapressão chamando request() para especificar quantos mais elementos de dados ele pode processar.
Se o assinante solicitar mais de um elemento de dados cada vez que ele chamar onNext(), request(10) por exemplo, o editor enviará os próximos 10 elementos imediatamente se estiverem disponíveis ou quando estiverem disponíveis. Esses elementos se acumulam em um buffer na extremidade do assinante e, como cada chamada onNext() solicitará mais 10, a lista de pendências continuará crescendo até o editor não ter mais elementos de dados para enviar ou ocorrer um estouro de buffer no assinante, resultando em erros de memória insuficiente.
Cancelar uma assinatura
Uma assinatura gerencia o estado da transferência de dados entre um publicador e um assinante. A assinatura estará ativa até que o publicador tenha concluído a transferência de todos os dados para o assinante ou o assinante não esteja mais interessado em receber dados. Há algumas maneiras de cancelar uma assinatura, conforme mostrado abaixo.
O exemplo a seguir cancela a assinatura descartando o assinante:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
Disposable disposable = asyncClient.receive().subscribe(
partitionEvent -> {
Long num = partitionEvent.getData().getSequenceNumber()
System.out.println("Sequence number of received event: " + num);
},
ex -> System.out.println("Error receiving events: " + ex.getMessage()),
() -> System.out.println("Successfully completed receiving all events"));
// much later on in your code, when you are ready to cancel the subscription,
// you can call the dispose method, as such:
disposable.dispose();
O exemplo a seguir cancela a assinatura chamando o cancel() método em Subscription:
EventHubConsumerAsyncClient asyncClient = new EventHubClientBuilder()
.connectionString("<your connection string>")
.consumerGroup("<your consumer group>")
.buildAsyncConsumerClient();
asyncClient.receive().subscribe(new Subscriber<PartitionEvent>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1); // request 1 data element to begin with
}
@Override
public void onNext(PartitionEvent partitionEvent) {
System.out.println("Sequence number of received event: " + partitionEvent.getData().getSequenceNumber());
this.subscription.cancel(); // Cancels the subscription. No further event is received.
}
@Override
public void onError(Throwable throwable) {
System.out.println("Error receiving events: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Successfully completed receiving all events")
}
});
Conclusão
Os threads são recursos caros que você não deve desperdiçar ao aguardar respostas de chamadas de serviço remoto. À medida que a adoção de arquiteturas de microsserviços aumenta, a necessidade de dimensionar e usar recursos com eficiência se torna vital. As APIs assíncronas são favoráveis quando há operações dependentes da rede. O SDK do Azure para Java oferece um conjunto avançado de APIs para operações assíncronas para ajudar a maximizar os recursos do sistema. Recomendamos fortemente que você experimente nossos clientes assíncronos.
Para obter mais informações sobre os operadores que melhor se adaptam às suas tarefas específicas, consulte Qual operador preciso? No Guia de Referência do Reator 3.
Próximas etapas
Agora que você compreende melhor os vários conceitos de programação assíncrona, é importante aprender a iterar os resultados. Para obter mais informações sobre as melhores estratégias de iteração e detalhes de como a paginação funciona, consulte Paginação e iteração no SDK do Azure para Java.