Sunday, May 18, 2025

🚀 Streaming PostgreSQL Changes to BigQuery using Cloud Run Jobs + Cloud Scheduler 🔄

This lightweight Change Data Capture (CDC) pipeline streams PostgreSQL logical replication events to BigQuery — no Debezium, no Kafka, just native GCP tools.



🧩 Architecture Overview

[Cloud SQL (PostgreSQL)]
   └─ Logical replication via wal2json plugin
        ↓
[Cloud Scheduler] (runs every 12 minutes)
   └─ Triggers →
[Cloud Run Job (custom CDC listener)]
   └─ Pulls logical changes from slot
   └─ Publishes change events (JSON) →
[Cloud Pub/Sub Topic]
   ↓
[Dataflow (Apache Beam Flex Template)]
   ↓
[BigQuery] 


📌 GitHub: github.com/HenryXiloj/demos-gcp/tree/main/pg-cdc-to-bq-streaming

💡 Key Highlights

✅ CDC listener written in Python using psycopg2 with logical replication

✅ Deployed as a Cloud Run Job for short-lived, stateless execution

✅ Orchestrated by Cloud Scheduler (runs every 12 minutes)

✅ Publishes change events as JSON to Pub/Sub

✅ Real-time ingestion into BigQuery using Apache Beam (Dataflow Flex Template)

✅ Includes graceful shutdown with SIGTERM handling


🔄 Why Use Cloud Run Jobs?

  • Precise control over frequency & duration
  • No continuously running services
  • Fully stateless and pay-as-you-go
  • No external brokers or connectors required

⚖️ Compared to Traditional CDC (e.g., Debezium/Kafka)

🔹 No always-on infrastructure

🔹 No Zookeeper or Kafka to maintain

🔹 Native GCP integration for IAM, networking, and monitoring

🔹 Lower cost, easier to secure and deploy

⚡ Performance & Scalability

  • Handles thousands of changes per minute
  • Typical latency: 10–15 minutes
  • Scales with workload by adjusting Cloud Scheduler frequency

This design offers full control, better security posture, and serverless simplicity, making it ideal for event-driven analytics pipelines.

👉 Full implementation:

🔗 github.com/HenryXiloj/demos-gcp/tree/main/pg-cdc-to-bq-streaming


















Friday, May 2, 2025

Dataflow + Terraform: Secure Cloud SQL to BigQuery via PSA/PSC

 

Introduction

Two fully automated, production-grade pipelines demonstrate secure data ingestion from Cloud SQL PostgreSQL into BigQuery, using Apache Beam (Python) and Dataflow Flex Templates — all without public IP exposure.





✅ 1. Private Service Access (PSA) – Internal IP Connectivity

A Cloud SQL instance is provisioned with a private IP via PSA, and Dataflow connects over the internal VPC network.

📌 Key Highlights:

  • Cloud SQL + read replica
  • Private IP via PSA
  • VPC subnet + firewall on port 5432
  • Beam pipeline reads from PostgreSQL → BigQuery
  • Fully managed with Terraform
  • Optional GitHub Actions for CI/CD

🔌 JDBC Format: jdbc:postgresql://<private-ip>:5432/my-database2

🔗 Project Repo: 👉 github.com/.../cloud-sql-psa-with-bigquery


🔒 2. Private Service Connect (PSC) – DNS + SSL Certificate Access

PSC provides hostname-based, IP-less access to Cloud SQL, secured via SSL certificates stored in GCS.

📌 Key Highlights:

  • PSC forwarding rule + private DNS
  • SSL certs: server-ca.pem, client-cert.pem, client-key.pem
  • Enforced permissions (chmod 0600 on private key)
  • Beam pipeline loads certs at runtime from GCS
  • Flex Template accepts PSC host, cert path, and DB credentials
  • psycopg2 uses sslmode=verify-ca
  • Logs: success + errors written to BigQuery

🔗 Project Repo: 👉 github.com/.../cloud-sql-psc-with-bigquery


🛠️ Shared Architecture & Automation (Both Pipelines)

  • terraform/: VPC, Cloud SQL, BigQuery, IAM, DNS
  • sql/: PostgreSQL table schema + init script
  • postgresql_to_bq_flex_template/: Beam logic, Dockerfile, metadata
  • GCS: stores SSL certs + Dataflow staging artifacts
  • Flex Templates fully parameterized via metadata.json
  • Error handling + retry logic + scalable Dataflow workers


📊 Use Case Comparison


Both implementations align with cloud security and automation best practices:

✅ No public IPs

✅ IAM least-privilege access

✅ SSL/TLS enforcement (PSC)

✅ Infrastructure as Code with Terraform

✅ CI/CD extensibility


These blueprints are ideal for teams building secure, private, and scalable data pipelines on Google Cloud Platform using Dataflow, Cloud SQL, and BigQuery.
























Tuesday, February 18, 2025

Integrating Google Cloud Pub/Sub with Terraform and Spring Boot 3 (Java 21)

Introduction

In this blog post, I'll demonstrate how to provision Google Cloud Pub/Sub resources using Terraform and integrate them with a Spring Boot 3 application running Java 21. This setup enables seamless message publishing and subscription processing in a cloud-native environment.




Infrastructure Setup with Terraform

To automate the provisioning of Google Cloud resources, we'll use Terraform to create:

  • A Google Cloud Pub/Sub Topic and Subscription
  • A Service Account with the required IAM roles

Terraform Configuration

Provider Configuration (provider.tf)

terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "6.21.0"
    }
  }
}

provider "google" {
  project = var.project_id
  region  = var.region
  zone    = var.zone
}

provider "google-beta" {
  project = var.project_id
  region  = var.region
  zone    = var.zone
}
Variables (variable.tf)
variable "project_id" {
  default = ""
}

variable "region" {
  default = ""
}

variable "zone" {
  default = ""
}

variable "sec_region" {
  default = ""
}

variable "sec_zone" {
  default = ""
}
IAM Configuration (iam.tf)

resource "google_service_account" "pubsub_service_account" {
  project      = var.project_id
  account_id   = "pubsub-service-account-id"
  display_name = "Service Account for Cloud SQL"
}

output "service_account_email" {
  value = google_service_account.pubsub_service_account.email
}

resource "google_project_iam_member" "member-role" {
  for_each = toset(["roles/pubsub.admin"])
  role    = each.key
  project = var.project_id
  member  = "serviceAccount:${google_service_account.pubsub_service_account.email}"
}


Pub/Sub Resources (topic.tf)

resource "google_pubsub_topic" "my_topic" {
  project = var.project_id
  name    = "my_topic"
}

resource "google_pubsub_subscription" "my_subscription" {
  project = var.project_id
  name    = "my_subscription"
  topic   = google_pubsub_topic.my_topic.id
  message_retention_duration = "3600s"
  retain_acked_messages      = true
  ack_deadline_seconds       = 30

  retry_policy {
    minimum_backoff = "10s"
  }
}

Spring Boot 3 Implementation

We'll integrate Google Cloud Pub/Sub with a Spring Boot 3 application using the spring-cloud-gcp-starter-pubsub library.

Dependencies (pom.xml)

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
        <version>6.0.0</version>
    </dependency>
</dependencies>

Configuration (application.yml)
spring:
  application:
    name: demo-pub-sub-api
  cloud:
    gcp:
      project-id: <my-project-id>

Main Application (DemoPubSubApiApplication.java)

@SpringBootApplication
public class DemoPubSubApiApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoPubSubApiApplication.class, args);
    }
}
Pub/Sub Integration (PubSubApplication.java)

@Configuration
@Slf4j
public class PubSubApplication {
    private final String topicName = "<MY-TOPIC-NAME>";
    private final String subscriptionName = "<MY-SUBSCRIPTION-NAME>";

    @Bean
    public PubSubInboundChannelAdapter messageChannelAdapter(
            @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
            PubSubTemplate pubSubTemplate) {
        PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, subscriptionName);
        adapter.setOutputChannel(inputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        return adapter;
    }

    @Bean
    public MessageChannel pubsubInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "pubsubInputChannel")
    public MessageHandler messageReceiver() {
        return message -> {
            log.info("Received Message: {}", new String((byte[]) message.getPayload()));
            BasicAcknowledgeablePubsubMessage originalMessage = message.getHeaders()
                .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
            originalMessage.ack();
        };
    }
}

REST Controller (WebAppController.java)

@RestController
@Slf4j
public class WebAppController {
    private final PubsubOutboundGateway messagingGateway;

    public WebAppController(PubsubOutboundGateway messagingGateway) {
        this.messagingGateway = messagingGateway;
    }

    @PostMapping("/publishMessage")
    public RedirectView publishMessage(@RequestParam("message") String message) {
        messagingGateway.sendToPubsub(message);
        return new RedirectView("/");
    }
}
Frontend (index.html)
<!DOCTYPE html>
<html>
<head>
    <title>Spring Integration GCP Pub/Sub</title>
</head>
<body>
    <form action="/publishMessage" method="post">
        Publish message: <input type="text" name="message" /> <input type="submit" value="Publish!"/>
    </form>
</body>
</html>

Running the Application


Provision Cloud Resources


terraform init
terraform apply
Set Environment Variables

export GOOGLE_APPLICATION_CREDENTIALS=mypath/application_default_credentials.json
export GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES=1

Run the Spring Boot Application

mvn spring-boot:run
Publish Messages from the Web Interface

  • Navigate to http://localhost:8080






  • Enter a message and click "Publish!"
  • The message should appear in the logs..
.PubSubApplication    : Message arrived! Payload: Hello pub sub!

GitHub Repository

The full code can be found on GitHub: GitHub - HenryXiloj/demos-gcp

Conclusion

This post demonstrated how to integrate Google Cloud Pub/Sub with Terraform and Spring Boot 3 using Java 21. By leveraging Infrastructure as Code (IaC) with Terraform, we can ensure a consistent and repeatable setup while Spring Boot handles the message processing efficiently.



References

https://spring.io/guides/gs/messaging-gcp-pubsub
























🚀 Streaming PostgreSQL Changes to BigQuery using Cloud Run Jobs + Cloud Scheduler 🔄

This lightweight Change Data Capture (CDC) pipeline streams PostgreSQL logical replication events to BigQuery — no Debezium, no Kafka, just ...