March Developer Challenge – CloudEvents: Week 4

Last week we learnt more about Event-Driven Architecture and we successfully published a CloudEvent to SAP Integration Suite, advanced event mesh (AEM). This week we will build upon what we learnt last week and we will extend our program to subscribe to a topic. By subscribing to a topic we will be able to receive messages from AEM. Before we get to the challenge, we might need to expand a bit on some concepts. Let’s get started.

 wk4-data-flow.png

Week 4 – Data flow

Links to March’s developer challenge:

Topics

Last week we mentioned that a topic is a means by which a publisher classifies a message. The topic tells us what type of message we will receive if we subscribe to a specific topic. In essence, it is a string that is composed of one or more levels. Each level is separated by a forward slash (/) and the levels can be anything. This is commonly known as topic-level granularity. The granularity allows for more targeted and efficient information exchange. Instead of having a single topic for all updates on a business object in a complex system (/BusinessPartner), the system can have distinct topics for different types of updates on a business object (/BusinessPartner/Created, /BusinessPartner/Updated, /BusinessPartner/Deleted). There is no specific schema/specification on how you need to structure your topic string but you do find that patterns are established within a system. Let’s get familiar with the structure of a topic by “dissecting” a real-world topic. Below we can see a topic on which an SAP S/4HANA Cloud system will publish a Business Partner message.

Example: sap/S4HANAOD/E4L/ce/sap/s4/beh/businesspartner/v1/BusinessPartner/Created/v1:

  • sap/S4HANAOD/E4L: System information.
  • /ce: CloudEvent. We know that all events published by an SAP S/4HANA Cloud system follow the CloudEvent specification
  • /sap/s4: This is coming from an SAP S/4HANA system.
  • /beh/businesspartner/v1/BusinessPartner: Information of the Business object that we will be receiving.
  • /Created: This is the action that took place in the source system. In this case, it is notifying us that a BusinessPartner was created. Many actions can take place in a system, e.g. this could be /Updated, /Deleted. In another case, if we were dealing with a business object like a PurchaseOrder, there could be an event raised when it is /Cancelled or /Rejected.
  • /v1: Version of the message. If a new version of the message is made available, e.g. adding new fields to the payload, then this will change.

In our case, we’ve defined levels on our topic string based on the week, SAP Community ID and action, e.g. dev-challenge/week-4/ajmaradiaga/notification.

Now, by knowing the topic on which a message type will be published, we can create a consumer program/service that subscribes to the topic directly (aka topic endpoint) and processes the messages sent to it. Generally, you can subscribe to a topic by specifying the entire topic string when establishing the connection, e.g. sap/S4HANAOD/E4L/ce/sap/s4/beh/businesspartner/v1/BusinessPartner/Created/v1. But what if we want to subscribe to all actions (Created, Updated, Deleted) that occur on a BusinessPartner object? Luckily, in the case of SAP Integration Suite, advanced event mesh we can subscribe to the topic by using wildcards (*). For example, by subscribing to the topic sap/S4HANAOD/E4L/ce/sap/s4/beh/businesspartner/v1/BusinessPartner/*/v1 I will be able to get all messages for different actions (Created, Updated, Deleted) whose version is v1. In AEM, the > character can be used at the last level of a subscription to indicate a “one or more” wildcard match for any topics, e.g. by subscribing to the topic sap/S4HANAOD/E4L/ce/sap/s4/beh/> will bring all objects that are published under that prefix, independent of type, action, and version.

In the example above we can see how the topic level granularity can allow a consumer program/service to subscribe only to the information it needs. To learn more about wildcard characters in topic subscriptions 👉: https://help.pubsub.em.services.cloud.sap/Messaging/Wildcard-Charaters-Topic-Subs.htm

If our consumer program/service subscribes to a topic, we create a topic endpoint, and we will receive all messages for that topic subscription. That said, topic endpoints last only as long as the consumer is connected. The problem here is that our consumer needs to be online in order to receive a message. If the consumer becomes unavailable then we will end up losing messages In some scenarios, this is unacceptable and we need to ensure that we receive and process all messages published. Fortunately, there is a mechanism to retain messages without the need for a consumer service to be online 100%. Then, the consumer can process the messages asynchronously or whenever it is available. Enter Queues.

Queues

 Queues allow us to subscribe to one or more topics and receive messages for all topics matching their subscriptions. The messages are received by the messaging system, saved in the queue and delivered to consuming clients if they are online and connected or held in the queue until the consumer becomes available. Queues can provide exclusive access to one consumer or access to multiple consumers where messages are distributed among the consumers. The message will be in the queue until a consumer acknowledges that a message has been processed. Only then the message will be removed from the queue.

wk4-guaranteed-queue.png

Queue

In the case of AEM, Queues can be durable or non-durable:

  • Durable queues exist until they are removed by an administrative action. They accumulate messages whether clients are online or offline. When offline clients reconnect, they receive all of the messages that accumulated while they were offline.
  • Temporary (or non-durable) queues follow the lifecycle of the client connection and are useful for ensuring message persistence while clients are online.

Topic endpoint

As mentioned before, we can subscribe to a topic directly. A topic endpoint is created after establishing a connection to AEM (the messaging service – messaging_service.connect()) and subscribing to the topic (dev-challenge/week-4/[sapcommunityid]/processed – direct_receive_service.with_subscriptions(topics).build()). This is not a polling mechanism, but a running connection is required, through which AEM will send a message to your service. If your service is not online the message will be missed. See the code sample in the section below.

To learn more about Topic endpoints and Queues 👉: https://help.pubsub.em.services.cloud.sap/Get-Started/topic-endpoints-queues.htm

Week 4 challenge – Subscribe to topic in SAP Integration Suite, advanced event mesh

👉 Your task for this week is: Extend the program you created for last week’s challenge and subscribe to the dev-challenge/week-4/[sapcommunityid]/processed topic (this will create a topic endpoint). Your program should send a message to the topic dev-challenge/week-4/[sapcommunityid]/notification and expect a response on the following topic dev-challenge/week-4/[sapcommunityid]/processed. The response will contain a hash. Please share the hash value as a comment in the discussion.

Remember that we are creating a topic endpoint, which means that before sending the notification message (dev-challenge/week-4/[sapcommunityid]/notification), you need to ensure that the subscriber on your end is working and subscribed to dev-challenge/week-4/[sapcommunityid]/processed, as the processor service will handle the notification almost instantaneously. If your subscriber is not online, the message will be missed/”lost” as no one will be listening on the topic.

wk4-data-flow.pngWeek 4 – Data flow

The diagram above explains the message processing. For example, in my case, I will publish the message to the topic dev-challenge/week-4/ajmaradiaga/notification and I will receive a response on the following topic dev-challenge/week-4/ajmaradiaga/processed. Below is an example of the response payload sent by the processor service to the dev-challenge/week-4/ajmaradiaga/processed topic:

{
  "id": "8dda9501-6379-4edc-91a7-6e78bec68746",
  "time": "2024-03-20T14:42:03.863Z",
  "type": "com.sap.dev-challenge.wk-4.processed.v1",
  "source": "https://ce-dev-challenge-wk4.cfapps.eu10.hana.ondemand.com/",
  "specversion": "1.0",
  "data": {
    "messageId": "a2e9ad2a-4955-4fb5-bd6c-91785548854b",
    "sapCommunityId": "ajmaradiaga",
    "hash": "285ed899bc5de8540a5ade05a673d60cbd68dc7bb2e21afd8507741a777e8393"
  }
}

To subscribe to the dev-challenge/week-4/[sapcommunityid]/processed topic you will need to create a topic endpoint. This can be achieved by using the Solace SDK available for your particular language. Similar to week 3, there is no need to reinvent the wheel here… check out the code available in the Solace Samples org, e.g. for Node.js, Python and the tutorials available for the different programming languages. There are plenty of examples there that show you how to use the different protocols.

In my case, I ended up using Python for which there is a library available (solace-pubsubpluis) and there is also a detailed guide for how to receive messages using the Python library. I connect to AEM using Solace Messaging. See some sample code below.

 

from solace.messaging.messaging_service import MessagingService, RetryStrategy
from solace.messaging.receiver.inbound_message import InboundMessage
from solace.messaging.receiver.message_receiver import MessageHandler
from solace.messaging.config.transport_security_strategy import TLS
from solace.messaging.resources.topic_subscription import TopicSubscription

SOLACE_TRANSPORT_PROTOCOL = os.getenv(“SOLACE_SMF_TRANSPORT_PROTOCOL”)
SOLACE_HOST = os.getenv(“SOLACE_SMF_HOST”)
SOLACE_PORT = os.getenv(“SOLACE_SMF_PORT”)
SOLACE_USERNAME = os.getenv(“SOLACE_SMF_USERNAME”)
SOLACE_PASSWORD = os.getenv(“SOLACE_SMF_PASSWORD”)

def direct_message_consume(msg_service: MessagingService, consumer_subscription: str):
“””This method will create an receiver instance to receive str or byte array type message”””
try:
topics = [TopicSubscription.of(consumer_subscription)]

direct_receive_service = msg_service.create_direct_message_receiver_builder()
direct_receive_service = direct_receive_service.with_subscriptions(topics).build()
direct_receive_service.start()
direct_receive_service.receive_async(MessageHandlerImpl())
print(f”Subscribed to: {consumer_subscription}”)

# Enter never ending loop to keep the receiver running
while True:
time.sleep(MAX_SLEEP)

finally:
direct_receive_service.terminate()
msg_service.disconnect()

transport_security = TLS.create().without_certificate_validation()

messaging_service = MessagingService.builder().from_properties(broker_props)
.with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20, 3))
.with_transport_security_strategy(transport_security)
.build()

messaging_service.connect()

##########################
# Consuming from a topic #
##########################
CONSUMER_SUBSCRIPTION = “dev-challenge/week-4/ajmaradiaga/processed”

print(“Execute Direct Consume – String”)
direct_message_consume(messaging_service, CONSUMER_SUBSCRIPTION)

 


🚨Credentials 🚨

To communicate with AEM, you will need to authenticate when posting a message. In the section below you can find the credentials required to connect.

For the adventurous out there…. I’m also sharing the connection details for different protocols that we can use to communicate with AEM, e.g. AMQP, Solace Messaging, Solace Web Messaging. In case you want to play around and get familiar with different protocols.

🔐Expand to view credentials 🔓

  • Connection Type:
    • AMQP: amqps://mr-connection-plh11u5eu6a.messaging.solace.cloud:5671
    • Solace Messaging: tcps://mr-connection-plh11u5eu6a.messaging.solace.cloud:55443
    • Solace Web Messaging: wss://mr-connection-plh11u5eu6a.messaging.solace.cloud:443
  • Username: solace-cloud-client
  • Password: mcrtp5mps5q12lfqed5kfndbi2
  • Message VPN: eu-fr-devbroker

Validation process for this week’s challenge

As part of the validation process for this week’s challenge, there is a “processor service” that will process the messages sent. The processor program follows the same validation process as in week 3 and it will generate a response, which will be sent to the dev-challenge/week-4/[sapcommunityid]/processed topic.

You can monitor the messages processed by the processor service on this website: https://ce-dev-challenge-wk4.cfapps.eu10.hana.ondemand.com/messages-hash/webapp/index.html. Similar to last week, you will also be able to see if there are any errors in the message sent, the monitoring app will tell you what the error is, e.g. not a valid CloudEvent, sapcommunityid extension context attribute missing, or sapcommunityid doesn’t match community Id specified in topic. The processor service is subscribed to the topics using a wildcard – dev-challenge/week-4/*/notification and will only generate hashes for valid payloads.

Below you can see a gif of a message processed successfully by the processor service.

wk4-solution.gif

 

Scroll to Top