In today's microservices and distributed systems, event-driven architectures have become a popular way to design systems. They allow services to react to changes in other services without tight coupling. However, one of the challenges with this approach is the lack of a standard contract for events. This can lead to inconsistencies and misunderstandings between services, such as different services interpreting the same event data differently.
This is where CloudEvents comes in. CloudEvents is a specification for describing event data in a common way. It aims to ease event declaration and delivery across services, platforms, and beyond.
In this post, we'll explore how to use CloudEvents in Go, specifically with NATS — a popular event-driven messaging system known for its high performance and simplicity — and with Kafka, known for its fault-tolerance and scalability.
"By using CloudEvents, we've created a common contract for our events. Any service that understands CloudEvents can interact with our service, regardless of the language or platform it's built on."
Setting Up
First, we need to define our configuration. We'll use environment variables for this:
type Config struct {
// NATSUrl is the url to the nats server
NATSUrl string `envconfig:"NATS_URL" required:"true"`
// Subject is the nats subject to publish cloudevents on.
Subject string `envconfig:"SUBJECT" required:"true"`
}
In our main function, we'll load these environment variables into a Config struct. This is a common practice in Go, where we use the envconfig package to automatically populate the struct fields with values from the environment variables.
var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Fatalf("Failed to process env var: %s", err)
}
Connecting to NATS
Next, we'll connect to NATS using the CloudEvents NATS protocol. This protocol is specifically designed to use CloudEvents with NATS, providing a standardized way to publish and subscribe to events.
p, err := cenats.NewConsumer(env.NATSUrl, env.Subject, cenats.NatsOptions())
if err != nil {
log.Fatalf("Failed to create protocol: %v", err)
}
defer p.Close(ctx)
Creating a CloudEvents Client
Now that we have a connection to NATS, we can create a CloudEvents client. This client is a key component in our implementation, allowing us to send and receive CloudEvents over NATS. It abstracts away the complexities of the underlying messaging system, allowing us to focus on the business logic of our application.
ceClient, err := cloudevents.NewClient(p)
if err != nil {
log.Fatalf("Failed to create client, %v", err)
}
With this client, we can now send and receive CloudEvents over NATS.
Adding Kafka Subscription
Now, let's extend our example to include Kafka, another popular event-driven messaging system. We'll use the CloudEvents Kafka protocol to subscribe to events. First, we need to add Kafka configuration to our Config struct:
type Config struct {
// KafkaBrokers is a slice of urls to kafka brokers
KafkaBrokers []string `envconfig:"KAFKA_BROKERS" required:"true"`
// KafkaTopic is the Kafka topic to subscribe to
KafkaTopic string `envconfig:"KAFKA_TOPIC" required:"true"`
}
Next, we'll connect to Kafka:
kafkaProtocol, err := kafka.NewConsumer(
env.KafkaBrokers,
env.KafkaTopic,
kafka.WithConsumerGroup("my-group"),
)
if err != nil {
log.Fatalf("Failed to create protocol: %v", err)
}
defer kafkaProtocol.Close(ctx)
With this client, we can now receive CloudEvents from Kafka by calling the Receive method. This method blocks until a new event is available and then returns the event:
log.Println("Waiting for events...")
for {
event, err := kafkaClient.Receive(ctx)
if err != nil {
log.Printf("Failed to receive event: %v", err)
continue
}
log.Printf("Received event: %v", event)
}
Conclusion
By using CloudEvents, we've created a common contract for our events. Any service that understands CloudEvents can interact with our service, regardless of the language or platform it's built on. This makes our system more flexible and easier to integrate with.
Furthermore, CloudEvents provides several features that make working with events easier, such as standardized attributes for event time, event ID, source, type, and more. This allows us to handle events consistently, regardless of where they come from.
In conclusion, CloudEvents provides a powerful way to work with events in an event-driven system. With CloudEvents, we can focus on building our application logic, rather than worrying about how to format and parse our events.
Running event-driven workloads on Kubernetes?
Maincoders.AI monitors your clusters with AI — predicting failures before they disrupt your event pipelines.
Get Early Access →