Event‑Driven Architecture (EDA) decouples producers from consumers using an event broker. You get independent scaling, resilience, and faster iteration.
1) Why EDA?
- Loose coupling: Producers don’t know who consumes; consumers don’t care who produced.
- Elasticity: Scale consumers independently from producers.
- Resilience: Retry, backoff, DLQs mean failures don’t cascade.
- Speed: Teams ship features without synchronous dependencies.
2) Core building blocks (cloud‑agnostic)
- Event – immutable record of something that happened, with a unique ID and timestamp.
- Producer – publishes events (APIs, batch jobs, scheduled triggers).
- Broker – routes events (Pub/Sub, Kafka, RabbitMQ).
- Subscription/Queue – delivery pipeline for a consumer.
- Consumer – processes events (microservice, function, job).
- DLQ – dead‑letter queue for poison messages.
- Observability – logs, metrics, traces, payload samples.
- Idempotency – ability to handle the same event more than once safely.
3) Two reference patterns on GCP (but generic concepts)
Pattern A: Cloud Scheduler → Pub/Sub → Spring Boot on GKE
When you need cron‑like triggers (hourly, daily) to kick off a pipeline or poll external systems.
+--------------+ +---------+ +------------------+ | Cloud | ---> | Pub/Sub | ---> | Spring Boot on | | Scheduler | | Topic | | GKE (consumer) | +--------------+ +---------+ +------------------+ (produces events) (subscribes & processes)
Why this? Simple, cost‑effective, horizontally scalable consumers, works great for batch/stream hybrids.
Notes:
- Pub/Sub guarantees at‑least‑once delivery. Design consumers to be idempotent.
- Use message ordering keys only if you truly need ordering; it reduces parallelism.
- Use DLQ subscriptions with retry policies.
Pattern B: Cloud Scheduler → Workflows → Pub/Sub → Spring Boot on GKE/Cloud Run
Add orchestration (branching, retries, fan‑out, calling APIs) before publishing an event.
+--------------+ +-----------+ +---------+ +----------------------+
| Cloud |-->| Workflows |-->| Pub/Sub |-->| Spring Boot on GKE |
| Scheduler | | (logic) | | Topic | | or Cloud Run |
+--------------+ +-----------+ +---------+ +----------------------+
Why this? Centralize control flow, enrich payloads, call external APIs, then publish. Swap the consumer with Cloud Run when you want scale‑to‑zero and fast cold starts for lightweight handlers.
Add orchestration (branching, retries, fan‑out, calling APIs) before publishing an event.
+--------------+ +-----------+ +---------+ +----------------------+ | Cloud |-->| Workflows |-->| Pub/Sub |-->| Spring Boot on GKE | | Scheduler | | (logic) | | Topic | | or Cloud Run | +--------------+ +-----------+ +---------+ +----------------------+
Why this? Centralize control flow, enrich payloads, call external APIs, then publish. Swap the consumer with Cloud Run when you want scale‑to‑zero and fast cold starts for lightweight handlers.
4) Consumer code (Spring Boot, manual ack, idempotency)
Below is a trimmed version of a working setup using spring‑cloud‑gcp‑starter‑pubsub and Spring Integration — conceptually similar in any queue/broker.
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
<version>7.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
@Slf4j
@Configuration
public class PubSubApplication {
private final String topicName = "my_topic_test";
private final String subscriptionName = "my_topic_test_sub";
private final String projectId = "my_project_id";
@Bean
public PubSubConfiguration pubSubConfiguration() { return new PubSubConfiguration(); }
@Bean
public PubSubTemplate customPubSubTemplate(CredentialsProvider credentialsProvider) {
GcpProjectIdProvider localProjectIdProvider = () -> projectId;
PubSubConfiguration cfg = new PubSubConfiguration();
cfg.initialize(projectId);
DefaultPublisherFactory pub = new DefaultPublisherFactory(localProjectIdProvider);
pub.setCredentialsProvider(credentialsProvider);
DefaultSubscriberFactory sub = new DefaultSubscriberFactory(localProjectIdProvider, cfg);
sub.setCredentialsProvider(credentialsProvider);
return new PubSubTemplate(pub, sub);
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubTemplate pubSubTemplate) {
var adapter = new PubSubInboundChannelAdapter(pubSubTemplate, subscriptionName);
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@Bean
public MessageChannel pubsubInputChannel() { return new DirectChannel(); }
@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -> {
var payload = new String((byte[]) message.getPayload());
var originalMessage = message.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
var msgId = originalMessage.getPubsubMessage().getMessageId();
// 1) Idempotency check (cache/DB): skip if already processed
if (alreadyProcessed(msgId)) {
log.warn("Duplicate delivery for messageId={}, ignoring.", msgId);
originalMessage.ack();
return;
}
try {
log.info("Processing messageId={} payload={} ", msgId, payload);
processBusinessLogic(payload);
markProcessed(msgId);
originalMessage.ack();
} catch (Exception e) {
log.error("Processing failed for messageId={}", msgId, e);
// no ack: let Pub/Sub redeliver per retry policy -> DLQ if max reached
}
};
}
// producer (optional)
@Bean
@ServiceActivator(inputChannel = "pubsubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
return new PubSubMessageHandler(pubsubTemplate, topicName);
}
@MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
public interface PubsubOutboundGateway { void sendToPubsub(String text); }
private boolean alreadyProcessed(String msgId) { /* Redis/DB/Cache lookup */ return false; }
private void markProcessed(String msgId) { /* Persist msgId */ }
private void processBusinessLogic(String payload) { /* your logic */ }
}
Idempotency options:
- Redis with TTL (fast, good for short windows).
- Database unique index on message_id (strong guarantee; add upsert).
- Hashing (content hash + window) when brokers don’t supply message IDs.
Below is a trimmed version of a working setup using spring‑cloud‑gcp‑starter‑pubsub and Spring Integration — conceptually similar in any queue/broker.
<dependency> <groupId>com.google.cloud</groupId> <artifactId>spring-cloud-gcp-starter-pubsub</artifactId> <version>7.3.0</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> </dependency>
@Slf4j @Configuration public class PubSubApplication { private final String topicName = "my_topic_test"; private final String subscriptionName = "my_topic_test_sub"; private final String projectId = "my_project_id"; @Bean public PubSubConfiguration pubSubConfiguration() { return new PubSubConfiguration(); } @Bean public PubSubTemplate customPubSubTemplate(CredentialsProvider credentialsProvider) { GcpProjectIdProvider localProjectIdProvider = () -> projectId; PubSubConfiguration cfg = new PubSubConfiguration(); cfg.initialize(projectId); DefaultPublisherFactory pub = new DefaultPublisherFactory(localProjectIdProvider); pub.setCredentialsProvider(credentialsProvider); DefaultSubscriberFactory sub = new DefaultSubscriberFactory(localProjectIdProvider, cfg); sub.setCredentialsProvider(credentialsProvider); return new PubSubTemplate(pub, sub); } @Bean public PubSubInboundChannelAdapter messageChannelAdapter( @Qualifier("pubsubInputChannel") MessageChannel inputChannel, PubSubTemplate pubSubTemplate) { var adapter = new PubSubInboundChannelAdapter(pubSubTemplate, subscriptionName); adapter.setOutputChannel(inputChannel); adapter.setAckMode(AckMode.MANUAL); return adapter; } @Bean public MessageChannel pubsubInputChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "pubsubInputChannel") public MessageHandler messageReceiver() { return message -> { var payload = new String((byte[]) message.getPayload()); var originalMessage = message.getHeaders() .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class); var msgId = originalMessage.getPubsubMessage().getMessageId(); // 1) Idempotency check (cache/DB): skip if already processed if (alreadyProcessed(msgId)) { log.warn("Duplicate delivery for messageId={}, ignoring.", msgId); originalMessage.ack(); return; } try { log.info("Processing messageId={} payload={} ", msgId, payload); processBusinessLogic(payload); markProcessed(msgId); originalMessage.ack(); } catch (Exception e) { log.error("Processing failed for messageId={}", msgId, e); // no ack: let Pub/Sub redeliver per retry policy -> DLQ if max reached } }; } // producer (optional) @Bean @ServiceActivator(inputChannel = "pubsubOutputChannel") public MessageHandler messageSender(PubSubTemplate pubsubTemplate) { return new PubSubMessageHandler(pubsubTemplate, topicName); } @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel") public interface PubsubOutboundGateway { void sendToPubsub(String text); } private boolean alreadyProcessed(String msgId) { /* Redis/DB/Cache lookup */ return false; } private void markProcessed(String msgId) { /* Persist msgId */ } private void processBusinessLogic(String payload) { /* your logic */ } }
Idempotency options:
- Redis with TTL (fast, good for short windows).
- Database unique index on message_id (strong guarantee; add upsert).
- Hashing (content hash + window) when brokers don’t supply message IDs.
5) Designing for at‑least‑once delivery
Reality: Most brokers deliver at least once. Embrace it.
- Make handlers idempotent (no side effects on duplicates). Examples: upsert by natural key, compare‑and‑swap, store message_id.
- Deduplicate at write: Use DB constraints; on conflict do nothing/update.
- Outbox pattern: Write to your DB + outbox table in one transaction; a relay publishes reliably.
- Poison messages: Route to DLQ with context (trace ID, payload sample, last error). Build a replay tool.
6) Scaling & delivery semantics
Rule of thumb: Prefer Cloud Run for stateless, spiky, HTTP‑triggered consumers. Prefer GKE for heavy runtimes, sidecars, or advanced networking.
Rule of thumb: Prefer Cloud Run for stateless, spiky, HTTP‑triggered consumers. Prefer GKE for heavy runtimes, sidecars, or advanced networking.
7) Security & governance
- Least privilege service accounts; avoid long‑lived keys.
- Schema governance (JSON Schema/OpenAPI/Avro). Version events with type + version.
- PII controls: Mask in logs; encrypt at rest; restrict who can subscribe.
- Contracts: Document event types and SLAs (latency, retention, retry policy).
- Least privilege service accounts; avoid long‑lived keys.
- Schema governance (JSON Schema/OpenAPI/Avro). Version events with type + version.
- PII controls: Mask in logs; encrypt at rest; restrict who can subscribe.
- Contracts: Document event types and SLAs (latency, retention, retry policy).