Sunday, September 7, 2025

Event-Driven Architecture on GCP with Pub/Sub & Spring Boot

Event‑Driven Architecture (EDA) decouples producers from consumers using an event broker. You get independent scaling, resilience, and faster iteration.


1) Why EDA?

  • Loose coupling: Producers don’t know who consumes; consumers don’t care who produced.
  • Elasticity: Scale consumers independently from producers.
  • Resilience: Retry, backoff, DLQs mean failures don’t cascade.
  • Speed: Teams ship features without synchronous dependencies.


2) Core building blocks (cloud‑agnostic)

  • Event – immutable record of something that happened, with a unique ID and timestamp.
  • Producer – publishes events (APIs, batch jobs, scheduled triggers).
  • Broker – routes events (Pub/Sub, Kafka, RabbitMQ).
  • Subscription/Queue – delivery pipeline for a consumer.
  • Consumer – processes events (microservice, function, job).
  • DLQ – dead‑letter queue for poison messages.
  • Observability – logs, metrics, traces, payload samples.
  • Idempotency – ability to handle the same event more than once safely.


3) Two reference patterns on GCP (but generic concepts)

Pattern A: Cloud Scheduler → Pub/Sub → Spring Boot on GKE

When you need cron‑like triggers (hourly, daily) to kick off a pipeline or poll external systems.

+--------------+      +---------+      +------------------+
| Cloud        | ---> | Pub/Sub | ---> | Spring Boot on   |
| Scheduler    |      |  Topic  |      | GKE (consumer)   |
+--------------+      +---------+      +------------------+
        (produces events)            (subscribes & processes) 

Why this? Simple, cost‑effective, horizontally scalable consumers, works great for batch/stream hybrids.

Notes:

  • Pub/Sub guarantees at‑least‑once delivery. Design consumers to be idempotent.
  • Use message ordering keys only if you truly need ordering; it reduces parallelism.
  • Use DLQ subscriptions with retry policies.


Pattern B: Cloud Scheduler → Workflows → Pub/Sub → Spring Boot on GKE/Cloud Run

Add orchestration (branching, retries, fan‑out, calling APIs) before publishing an event.

+--------------+   +-----------+   +---------+   +----------------------+
| Cloud        |-->| Workflows |-->| Pub/Sub |-->| Spring Boot on GKE   |
| Scheduler    |   | (logic)   |   |  Topic  |   | or Cloud Run          |
+--------------+   +-----------+   +---------+   +----------------------+ 

Why this? Centralize control flow, enrich payloads, call external APIs, then publish. Swap the consumer with Cloud Run when you want scale‑to‑zero and fast cold starts for lightweight handlers.


4) Consumer code (Spring Boot, manual ack, idempotency)

Below is a trimmed version of a working setup using spring‑cloud‑gcp‑starter‑pubsub and Spring Integration — conceptually similar in any queue/broker.

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
  <version>7.3.0</version>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-core</artifactId>
</dependency> 
@Slf4j
@Configuration
public class PubSubApplication {

  private final String topicName = "my_topic_test";
  private final String subscriptionName = "my_topic_test_sub";
  private final String projectId = "my_project_id";

  @Bean
  public PubSubConfiguration pubSubConfiguration() { return new PubSubConfiguration(); }

  @Bean
  public PubSubTemplate customPubSubTemplate(CredentialsProvider credentialsProvider) {
    GcpProjectIdProvider localProjectIdProvider = () -> projectId;
    PubSubConfiguration cfg = new PubSubConfiguration();
    cfg.initialize(projectId);
    DefaultPublisherFactory pub = new DefaultPublisherFactory(localProjectIdProvider);
    pub.setCredentialsProvider(credentialsProvider);
    DefaultSubscriberFactory sub = new DefaultSubscriberFactory(localProjectIdProvider, cfg);
    sub.setCredentialsProvider(credentialsProvider);
    return new PubSubTemplate(pub, sub);
  }

  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapter(
      @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    var adapter = new PubSubInboundChannelAdapter(pubSubTemplate, subscriptionName);
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);
    return adapter;
  }

  @Bean
  public MessageChannel pubsubInputChannel() { return new DirectChannel(); }

  @Bean
  @ServiceActivator(inputChannel = "pubsubInputChannel")
  public MessageHandler messageReceiver() {
    return message -> {
      var payload = new String((byte[]) message.getPayload());
      var originalMessage = message.getHeaders()
        .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
      var msgId = originalMessage.getPubsubMessage().getMessageId();

      // 1) Idempotency check (cache/DB): skip if already processed
      if (alreadyProcessed(msgId)) {
        log.warn("Duplicate delivery for messageId={}, ignoring.", msgId);
        originalMessage.ack();
        return;
      }

      try {
        log.info("Processing messageId={} payload={} ", msgId, payload);
        processBusinessLogic(payload);
        markProcessed(msgId);
        originalMessage.ack();
      } catch (Exception e) {
        log.error("Processing failed for messageId={}", msgId, e);
        // no ack: let Pub/Sub redeliver per retry policy -> DLQ if max reached
      }
    };
  }

  // producer (optional)
  @Bean
  @ServiceActivator(inputChannel = "pubsubOutputChannel")
  public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
    return new PubSubMessageHandler(pubsubTemplate, topicName);
  }

  @MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
  public interface PubsubOutboundGateway { void sendToPubsub(String text); }

  private boolean alreadyProcessed(String msgId) { /* Redis/DB/Cache lookup */ return false; }
  private void markProcessed(String msgId) { /* Persist msgId */ }
  private void processBusinessLogic(String payload) { /* your logic */ }
} 

Idempotency options:

  • Redis with TTL (fast, good for short windows).
  • Database unique index on message_id (strong guarantee; add upsert).
  • Hashing (content hash + window) when brokers don’t supply message IDs.


5) Designing for at‑least‑once delivery

Reality: Most brokers deliver at least once. Embrace it.

  • Make handlers idempotent (no side effects on duplicates). Examples: upsert by natural key, compare‑and‑swap, store message_id.
  • Deduplicate at write: Use DB constraints; on conflict do nothing/update.
  • Outbox pattern: Write to your DB + outbox table in one transaction; a relay publishes reliably.
  • Poison messages: Route to DLQ with context (trace ID, payload sample, last error). Build a replay tool.


6) Scaling & delivery semantics



Rule of thumb: Prefer Cloud Run for stateless, spiky, HTTP‑triggered consumers. Prefer GKE for heavy runtimes, sidecars, or advanced networking.


7) Security & governance

  • Least privilege service accounts; avoid long‑lived keys.
  • Schema governance (JSON Schema/OpenAPI/Avro). Version events with type + version.
  • PII controls: Mask in logs; encrypt at rest; restrict who can subscribe.
  • Contracts: Document event types and SLAs (latency, retention, retry policy).































Tuesday, August 5, 2025

🚀 Understanding O(1) vs O(n) – With Practical Code Examples

When writing efficient Java code, algorithmic complexity matters—a lot. In this post, I’ll walk you through two fundamental time complexities: O(1) and O(n), using clear Java 21 examples, and explain which one is more efficient for common search operations.



📌 What Do O(1) and O(n) Mean?

  • O(1) – Constant Time: The algorithm takes the same amount of time regardless of input size.
  • O(n) – Linear Time: The algorithm’s execution time increases linearly with input size.


🔍 Real Example: Name Search in a Collection

Let’s say you have a list of customer names, and you want to check if a name exists.


👎 O(n) - Linear Search (using List)

import java.util.List;

public class LinearSearchExample {
    public static boolean containsName(List<String> names, String target) {
        for (String name : names) {
            if (name.equals(target)) {
                return true;
            }
        }
        return false;
    }

    public static void main(String[] args) {
        List<String> customerNames = List.of("Alice", "Bob", "Henry", "Diana");
        System.out.println(containsName(customerNames, "Henry")); // true
    }
}


⏱️ Time Complexity: O(n)

🔄 The loop must check each name until it finds the target (or reaches the end).


✅ O(1) - Constant Time Lookup (using Set)

import java.util.Set;

public class ConstantTimeSearchExample {
    public static void main(String[] args) {
        Set<String> customerNames = Set.of("Alice", "Bob", "Henry", "Diana");

        boolean found = customerNames.contains("Henry"); // O(1) lookup
        System.out.println(found); // true
    }
}

⏱️ Time Complexity: O(1) on average

💡 Set uses a hash-based structure (e.g., HashSet) that enables constant-time lookup.

⚖️ Which Is More Efficient?








✅ Verdict: Use Set.contains() if you care about lookup speed—it’s much faster for large collections.

✨ Final Thoughts

Understanding algorithm complexity helps you write scalable, high-performance Java applications.

Knowing when to switch from List to Set can make a massive difference in performance, especially when you're processing thousands (or millions) of items.






Sunday, May 18, 2025

🚀 Streaming PostgreSQL Changes to BigQuery using Cloud Run Jobs + Cloud Scheduler 🔄

This lightweight Change Data Capture (CDC) pipeline streams PostgreSQL logical replication events to BigQuery — no Debezium, no Kafka, just native GCP tools.



🧩 Architecture Overview

[Cloud SQL (PostgreSQL)]
   └─ Logical replication via wal2json plugin
        ↓
[Cloud Scheduler] (runs every 12 minutes)
   └─ Triggers →
[Cloud Run Job (custom CDC listener)]
   └─ Pulls logical changes from slot
   └─ Publishes change events (JSON) →
[Cloud Pub/Sub Topic]
   ↓
[Dataflow (Apache Beam Flex Template)]
   ↓
[BigQuery] 


📌 GitHub: github.com/HenryXiloj/demos-gcp/tree/main/pg-cdc-to-bq-streaming

💡 Key Highlights

✅ CDC listener written in Python using psycopg2 with logical replication

✅ Deployed as a Cloud Run Job for short-lived, stateless execution

✅ Orchestrated by Cloud Scheduler (runs every 12 minutes)

✅ Publishes change events as JSON to Pub/Sub

✅ Real-time ingestion into BigQuery using Apache Beam (Dataflow Flex Template)

✅ Includes graceful shutdown with SIGTERM handling


🔄 Why Use Cloud Run Jobs?

  • Precise control over frequency & duration
  • No continuously running services
  • Fully stateless and pay-as-you-go
  • No external brokers or connectors required

⚖️ Compared to Traditional CDC (e.g., Debezium/Kafka)

🔹 No always-on infrastructure

🔹 No Zookeeper or Kafka to maintain

🔹 Native GCP integration for IAM, networking, and monitoring

🔹 Lower cost, easier to secure and deploy

⚡ Performance & Scalability

  • Handles thousands of changes per minute
  • Typical latency: 10–15 minutes
  • Scales with workload by adjusting Cloud Scheduler frequency

This design offers full control, better security posture, and serverless simplicity, making it ideal for event-driven analytics pipelines.

👉 Full implementation:

🔗 github.com/HenryXiloj/demos-gcp/tree/main/pg-cdc-to-bq-streaming


















Event-Driven Architecture on GCP with Pub/Sub & Spring Boot

Event‑Driven Architecture (EDA) decouples producers from consumers using an event broker. You get independent scaling, resilience, and faste...