Compartilhar via


Padrão de caixa de saída transacional com o Azure Cosmos DB

Azure Cosmos DB
Barramento de Serviço do Azure
Azure Functions

Implementar mensagens confiáveis em sistemas distribuídos pode ser desafiador. Este artigo descreve como usar o padrão de Caixa de Saída Transacional para mensagens confiáveis e a entrega garantida de eventos, uma parte importante do suporte ao processamento de mensagens idempotente. Para fazer isso, você usará lotes transacionais do Azure Cosmos DB e o feed de alterações em combinação com o Barramento de Serviço do Azure.

Visão geral

As arquiteturas de microsserviço estão ganhando popularidade e mostram a promessa de resolver problemas como escalabilidade, manutenção e agilidade, especialmente em aplicativos grandes. Mas esse padrão de arquitetura também apresenta desafios quando se trata de manipulação de dados. Em aplicativos distribuídos, cada serviço mantém independentemente os dados necessários para operar em um armazenamento de dados dedicado de propriedade do serviço. Para dar suporte a esse cenário, você normalmente usa uma solução de mensagens como RabbitMQ, Kafka ou Barramento de Serviço do Azure que distribui dados (eventos) de um serviço por meio de um barramento de mensagens para outros serviços do aplicativo. Os consumidores internos ou externos podem assinar essas mensagens e ser notificados sobre as alterações assim que os dados forem manipulados.

Um exemplo conhecido nessa área é um sistema de ordenação: quando um usuário deseja criar um pedido, um Ordering serviço recebe dados de um aplicativo cliente por meio de um ponto de extremidade REST. Ele mapeia a carga para uma representação interna de um Order objeto para validar os dados. Após uma confirmação bem-sucedida no banco de dados, ele publica um OrderCreated evento em um barramento de mensagens. Qualquer outro serviço interessado em novos pedidos (por exemplo, um Inventory ou Invoicing serviço), assinaria OrderCreated mensagens, as processaria e as armazenaria em seu próprio banco de dados.

O pseudocódigo a seguir mostra como esse processo normalmente se parece da perspectiva do Ordering serviço:

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Essa abordagem funciona bem até que ocorra um erro entre salvar o objeto de ordem e publicar o evento correspondente. O envio de um evento pode falhar neste ponto por muitos motivos:

  • Erros de rede
  • Interrupção do serviço de mensagem
  • Falha do host

Seja qual for o erro, o resultado é que o OrderCreated evento não pode ser publicado no barramento de mensagens. Outros serviços não serão notificados de que um pedido foi criado. O Ordering serviço agora tem que cuidar de várias coisas que não se relacionam com o processo de negócios real. Ele precisa acompanhar os eventos que ainda precisam ser colocados no barramento de mensagens assim que ele estiver online novamente. Até o pior caso pode acontecer: inconsistências de dados no aplicativo devido a eventos perdidos.

Diagrama que mostra o tratamento de eventos sem o padrão de Caixa de Saída Transacional.

Solução

Há um padrão conhecido chamado Caixa de Saída Transacional que pode ajudá-lo a evitar essas situações. Ele garante que os eventos sejam salvos em um armazenamento de dados (normalmente em uma tabela outbox em seu banco de dados) antes que eles sejam enviados por push para um agente de mensagens. Se o objeto de negócios e os eventos correspondentes forem salvos na mesma transação de banco de dados, é garantido que nenhum dado será perdido. Tudo será confirmado ou tudo será revertido se houver um erro. Para eventualmente publicar o evento, um serviço ou processo de trabalho diferente consulta a tabela Outbox para entradas sem tratamento, publica os eventos e os marca como processados. Esse padrão garante que os eventos não serão perdidos depois que um objeto de negócios for criado ou modificado.

Diagrama que mostra o tratamento de eventos com o padrão de Caixa de Saída Transacional e um serviço de retransmissão para publicar eventos no agente de mensagens.

Baixe um arquivo do Visio dessa arquitetura.

Em um banco de dados relacional, a implementação do padrão é simples. Se o serviço usar o Entity Framework Core, por exemplo, ele usará um contexto do Entity Framework para criar uma transação de banco de dados, salvar o objeto de negócios e o evento e confirmar a transação ou fazer uma reversão. Além disso, o serviço de trabalho que está processando eventos é fácil de implementar: consulta periodicamente a tabela Outbox para novas entradas, publica eventos recém-inseridos no barramento de mensagens e, por fim, marca essas entradas como processadas.

Na prática, as coisas não são tão fáceis quanto parecem primeiro. Mais importante, você precisa garantir que a ordem dos eventos seja preservada para que um OrderUpdated evento não seja publicado antes de um OrderCreated evento.

Implementação no Azure Cosmos DB

Esta seção mostra como implementar o padrão de Caixa de Saída Transacional no Azure Cosmos DB para obter mensagens confiáveis e em ordem entre diferentes serviços com a ajuda do feed de alterações do Azure Cosmos DB e do Barramento de Serviço. Ele demonstra um serviço de exemplo que gerencia Contact objetos (FirstName, , LastName, EmailCompany informações e assim por diante). Ele usa o padrão CQRS (Segregação de Responsabilidade de Comando e Consulta) e segue conceitos básicos de DDD (design controlado por domínio). Você pode encontrar o código de exemplo para a implementação no GitHub.

Um Contact objeto no serviço de exemplo tem a seguinte estrutura:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Assim que um Contact é criado ou atualizado, ele emite eventos que contêm informações sobre a alteração atual. Entre outros, os eventos de domínio podem ser:

  • ContactCreated. Gerado quando um contato é adicionado.
  • ContactNameUpdated. Gerado quando FirstName ou LastName alterado.
  • ContactEmailUpdated. Gerado quando o endereço de email é atualizado.
  • ContactCompanyUpdated. Gerado quando qualquer uma das propriedades da empresa é alterada.

Lotes transacionais

Para implementar esse padrão, você precisa garantir que o Contact objeto de negócios e os eventos correspondentes serão salvos na mesma transação de banco de dados. No Azure Cosmos DB, as transações funcionam de forma diferente do que funcionam em sistemas de banco de dados relacionais. As transações do Azure Cosmos DB, chamadas de lotes transacionais, operam em uma única partição lógica, para que garantam propriedades atomicidade, consistência, isolamento e durabilidade (ACID). Você não pode salvar dois documentos em uma operação de lote transacional em contêineres ou partições lógicas diferentes. Para o serviço de exemplo, isso significa que o objeto de negócios e o evento ou eventos serão colocados no mesmo contêiner e partição lógica.

Contexto, repositórios e UnitOfWork

O núcleo da implementação de exemplo é um contexto de contêiner que mantém o controle de objetos que são salvos no mesmo lote transacional. Ele mantém uma lista de objetos criados e modificados e opera em um único contêiner do Azure Cosmos DB. A interface para ele tem esta aparência:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

A lista no componente de contexto do contêiner rastreia Contact e DomainEvent objetos. Ambos serão colocados no mesmo contêiner. Isso significa que vários tipos de objetos são armazenados no mesmo contêiner do Azure Cosmos DB e usam uma Type propriedade para distinguir entre um objeto de negócios e um evento.

Para cada tipo, há um repositório dedicado que define e implementa o acesso a dados. A Contact interface do repositório fornece estes métodos:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

O Event repositório é semelhante, exceto que há apenas um método, que cria novos eventos no repositório:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

As implementações de ambas as interfaces de repositório obtêm uma referência por meio da injeção de dependência para uma única IContainerContext instância para garantir que ambas operem no mesmo contexto do Azure Cosmos DB.

O último componente é UnitOfWork, que confirma as alterações mantidas na instância no IContainerContext Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Tratamento de eventos: Criação e publicação

Sempre que um Contact objeto é criado, modificado ou (soft-) excluído, o serviço gera um evento correspondente. O núcleo da solução fornecida é uma combinação de DDD (design controlado pelo domínio) e o padrão de mediador proposto por Jimmy Bogard. Ele sugere manter uma lista de eventos que ocorreram devido a modificações do objeto de domínio e publicação desses eventos antes de salvar o objeto real no banco de dados.

A lista de alterações é mantida no próprio objeto de domínio para que nenhum outro componente possa modificar a cadeia de eventos. O comportamento da manutenção de eventos (IEvent instâncias) no objeto de domínio é definido por meio de uma interface IEventEmitter<IEvent> e implementado em uma classe abstrata DomainEntity :

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

O Contact objeto gera eventos de domínio. A Contact entidade segue os conceitos básicos de DDD, configurando os setters das propriedades de domínio como privados. Não existem setters públicos na classe. Em vez disso, ele oferece métodos para manipular o estado interno. Nesses métodos, os eventos apropriados para uma determinada modificação (por exemplo ContactNameUpdated ou ContactEmailUpdated) podem ser gerados.

Aqui está um exemplo de atualizações para o nome de um contato. (O evento é gerado no final do método.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return; // if an object is newly created, all modifications will be handled by ContactCreatedEvent

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

O correspondente ContactNameUpdatedEvent, que acompanha as alterações, tem esta aparência:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Até agora, os eventos são apenas registrados no objeto de domínio e nada é salvo no banco de dados ou até mesmo publicado em um agente de mensagens. Seguindo a recomendação, a lista de eventos será processada logo antes que o objeto de negócios seja salvo no armazenamento de dados. Nesse caso, isso ocorre no SaveChangesAsync método da IContainerContext instância, que é implementado em um método privado RaiseDomainEvents . (dObjs é a lista de entidades controladas do contexto do contêiner.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

Na última linha, o pacote MediatR , uma implementação do padrão do mediador em C#, é usado para publicar um evento dentro do aplicativo. Isso é possível porque todos os eventos como ContactNameUpdatedEvent implementar a INotification interface do pacote MediatR.

Esses eventos precisam ser processados por um manipulador correspondente. Aqui, a IEventsRepository implementação entra em jogo. Aqui está o exemplo do manipulador de NameUpdated eventos:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Uma IEventRepository instância é injetada na classe de manipulador por meio do construtor. Assim que um ContactNameUpdatedEvent é publicado no serviço, o Handle método é invocado e usa a instância do repositório de eventos para criar um objeto de notificação. Esse objeto de notificação, por sua vez, é inserido na lista de objetos rastreados no IContainerContext objeto e une os objetos que são salvos no mesmo lote transacional no Azure Cosmos DB.

Até agora, o contexto do contêiner sabe quais objetos processar. Para eventualmente persistir os objetos rastreados no Azure Cosmos DB, a IContainerContext implementação cria o lote transacional, adiciona todos os objetos relevantes e executa a operação no banco de dados. O processo descrito é tratado no SaveInTransactionalBatchAsync método, que é invocado pelo SaveChangesAsync método.

Aqui estão as partes importantes da implementação que você precisa para criar e executar o lote transacional:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects);

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Aqui está uma visão geral de como o processo funciona até agora (para atualizar o nome em um objeto de contato):

  1. Um cliente deseja atualizar o nome de um contato. O SetName método é invocado no objeto de contato e as propriedades são atualizadas.
  2. O ContactNameUpdated evento é adicionado à lista de eventos no objeto de domínio.
  3. O método do repositório de Update contatos é invocado, o que adiciona o objeto de domínio ao contexto do contêiner. O objeto agora é rastreado.
  4. CommitAsync é invocado na UnitOfWork instância, que, por sua vez, chama SaveChangesAsync o contexto do contêiner.
  5. Dentro SaveChangesAsync, todos os eventos na lista do objeto de domínio são publicados por uma MediatR instância e são adicionados por meio do repositório de eventos ao mesmo contexto de contêiner.
  6. Em SaveChangesAsync, um TransactionalBatch é criado. Ele manterá o objeto de contato e o evento.
  7. As TransactionalBatch execuções e os dados são confirmados no Azure Cosmos DB.
  8. SaveChangesAsync e CommitAsync retorne com êxito.

Persistência

Como você pode ver nos snippets de código anteriores, todos os objetos salvos no Azure Cosmos DB são encapsulados em uma DataObject instância. Este objeto fornece propriedades comuns:

  • ID.
  • PartitionKey.
  • Type.
  • State. Assim Createdcomo, Updated não será persistente no Azure Cosmos DB.
  • Etag. Para bloqueio otimista.
  • TTL. Propriedade Time To Live para limpeza automática de documentos antigos.
  • Data. Objeto de dados genérico.

Essas propriedades são definidas em uma interface genérica chamada IDataObject e usada pelos repositórios e pelo contexto do contêiner:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Os objetos encapsulados em uma DataObject instância e salvos no banco de dados serão semelhantes a este exemplo (Contact e ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Você pode ver que os Contact documentos e ContactNameUpdatedEvent (tipo domainEvent) têm a mesma chave de partição e que ambos os documentos serão persistidos na mesma partição lógica.

Processamento do feed de alterações

Para ler o fluxo de eventos e enviá-los para um agente de mensagens, o serviço usará o feed de alterações do Azure Cosmos DB.

O feed de alterações é um log persistente de alterações em seu contêiner. Ele opera em segundo plano e rastreia as modificações. Em uma partição lógica, a ordem das alterações é garantida. A maneira mais conveniente de ler o feed de alterações é usar uma função do Azure com um gatilho do Azure Cosmos DB. Outra opção é usar a biblioteca de processadores do feed de alterações. Ele permite integrar o processamento de feed de alterações em sua API Web como um serviço em segundo plano (por meio da IHostedService interface). O exemplo aqui usa um aplicativo de console simples que implementa a classe abstrata BackgroundService para hospedar tarefas em segundo plano de execução prolongada em aplicativos .NET Core.

Para receber as alterações do feed de alterações do Azure Cosmos DB, você precisa criar uma instância de um ChangeFeedProcessor objeto, registrar um método de manipulador para processamento de mensagens e começar a escutar as alterações:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Um método de manipulador (HandleChangesAsync aqui) processa as mensagens. Neste exemplo, os eventos são publicados em um tópico do Barramento de Serviço particionado para escalabilidade e tem o recurso de eliminação de duplicação habilitado. Qualquer serviço interessado em alterações Contact em objetos pode assinar esse tópico do Barramento de Serviço e receber e processar as alterações para seu próprio contexto.

As mensagens do Barramento de Serviço produzidas têm uma SessionId propriedade. Ao usar sessões no Barramento de Serviço, você garante que a ordem das mensagens seja preservada (primeiro a entrar, primeiro a sair (FIFO)). A preservação da ordem é necessária para esse caso de uso.

Este é o snippet que manipula mensagens do feed de alterações:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Tratamento de erros

Se houver um erro enquanto as alterações estiverem sendo processadas, a biblioteca de feed de alterações reiniciará a leitura de mensagens na posição em que processou com êxito o último lote. Por exemplo, se o aplicativo tiver processado 10.000 mensagens com êxito, agora está trabalhando no lote 10.001 a 10.025 e ocorre um erro, ele pode reiniciar e pegar seu trabalho na posição 10.001. A biblioteca controla automaticamente o que foi processado por meio de informações salvas em um Leases contêiner no Azure Cosmos DB.

É possível que o serviço já tenha enviado algumas das mensagens reprocessadas para o Barramento de Serviço. Normalmente, esse cenário levaria ao processamento duplicado de mensagens. Conforme observado anteriormente, o Barramento de Serviço tem um recurso para detecção de mensagens duplicadas que você precisa habilitar para esse cenário. O serviço verifica se uma mensagem já foi adicionada a um tópico do Barramento de Serviço (ou fila) com base na propriedade controlada pelo MessageId aplicativo da mensagem. Essa propriedade é definida como o ID documento do evento. Se a mesma mensagem for enviada novamente ao Barramento de Serviço, o serviço a ignorará e a descartará.

Manutenção

Em uma implementação típica da Caixa de Saída Transacional, o serviço atualiza os eventos manipulados e define uma Processed propriedade como true, indicando que uma mensagem foi publicada com êxito. Esse comportamento pode ser implementado manualmente no método de manipulador. No cenário atual, não há necessidade de tal processo. O Azure Cosmos DB rastreia eventos que foram processados usando o feed de alterações (em combinação com o Leases contêiner).

Como última etapa, ocasionalmente, você precisa excluir os eventos do contêiner para manter apenas os registros/documentos mais recentes. Para fazer uma limpeza periodicamente, a implementação aplica outro recurso do Azure Cosmos DB: Vida Útil (TTL) em documentos. O Azure Cosmos DB pode excluir automaticamente documentos com base em uma TTL propriedade que pode ser adicionada a um documento: um período de tempo em segundos. O serviço verificará constantemente o contêiner em busca de documentos que tenham uma TTL propriedade. Assim que um documento expirar, o Azure Cosmos DB o removerá do banco de dados.

Quando todos os componentes funcionam conforme o esperado, os eventos são processados e publicados rapidamente: em segundos. Se houver um erro no Azure Cosmos DB, os eventos não serão enviados para o barramento de mensagens, pois o objeto comercial e os eventos correspondentes não podem ser salvos no banco de dados. A única coisa a considerar é definir um valor apropriado TTL nos documentos quando o trabalho em DomainEvent segundo plano (processador de feed de alterações) ou o barramento de serviço não estiver disponível. Em um ambiente de produção, é melhor escolher um período de tempo de vários dias. Por exemplo, 10 dias. Todos os componentes envolvidos terão tempo suficiente para processar/publicar alterações no aplicativo.

Resumo

O padrão de Caixa de Saída Transacional resolve o problema de publicar eventos de domínio de forma confiável em sistemas distribuídos. Ao confirmar o estado do objeto comercial e seus eventos no mesmo lote transacional e usar um processador em segundo plano como uma retransmissão de mensagens, você garante que outros serviços, internos ou externos, eventualmente receberão as informações de que dependem. Este exemplo não é uma implementação tradicional do padrão de Caixa de Saída Transacional. Ele usa recursos como o feed de alterações do Azure Cosmos DB e o Time To Live que mantêm as coisas simples e limpas.

Aqui está um resumo dos componentes do Azure usados neste cenário:

Diagrama que mostra os componentes do Azure para implementar a Caixa de Saída Transacional com o Azure Cosmos DB e o Barramento de Serviço do Azure.

Baixe um arquivo do Visio dessa arquitetura.

As vantagens dessa solução são:

  • Mensagens confiáveis e entrega garantida de eventos.
  • Ordem preservada de eventos e eliminação de duplicação de mensagens por meio do Barramento de Serviço.
  • Não é necessário manter uma propriedade extra Processed que indique o processamento bem-sucedido de um documento de evento.
  • Exclusão de eventos do Azure Cosmos DB por meio de TTL (vida útil). O processo não consome unidades de solicitação necessárias para lidar com solicitações de usuário/aplicativo. Em vez disso, ele usa unidades de solicitação "sobras" em uma tarefa em segundo plano.
  • Processamento à prova de erros de mensagens por meio ChangeFeedProcessor (ou de uma função do Azure).
  • Opcional: vários processadores de feed de alterações, cada um mantendo seu próprio ponteiro no feed de alterações.

Considerações

O aplicativo de exemplo discutido neste artigo demonstra como você pode implementar o padrão de Caixa de Saída Transacional no Azure com o Azure Cosmos DB e o Barramento de Serviço. Há também outras abordagens que usam bancos de dados NoSQL. Para garantir que o objeto de negócios e os eventos serão salvos de forma confiável no banco de dados, você pode inserir a lista de eventos no documento do objeto empresarial. A desvantagem dessa abordagem é que o processo de limpeza precisará atualizar cada documento que contenha eventos. Isso não é ideal, especialmente em termos de custo de Unidade de Solicitação, em comparação com o uso de TTL.

Tenha em mente que você não deve considerar o código de exemplo fornecido aqui com código pronto para produção. Ele tem algumas limitações em relação ao multithreading, especialmente a maneira como os eventos são tratados na classe e como os DomainEntity objetos são acompanhados nas CosmosContainerContext implementações. Use-o como ponto de partida para suas próprias implementações. Como alternativa, considere o uso de bibliotecas existentes que já têm essa funcionalidade incorporada a elas, como NServiceBus ou MassTransit.

Implantar esse cenário

Você pode encontrar o código-fonte, os arquivos de implantação e as instruções para testar este cenário no GitHub: https://github.com/Azure-Samples/transactional-outbox-pattern.

Contribuidores

Este artigo é mantido pela Microsoft. Foi originalmente escrito pelos colaboradores a seguir.

Autor principal:

Para ver perfis não públicos do LinkedIn, entre no LinkedIn.

Próximas etapas

Leia estes artigos para saber mais: