Introduction
Get started with ClickHouse on Google Cloud C4A Arm virtual machines
Create a Firewall Rule on GCP
Create a Google Axion C4A Arm virtual machine on GCP
Set up GCP Pub/Sub and IAM for ClickHouse real-time analytics on Axion
Install ClickHouse
Establish a ClickHouse baseline on Arm
Build a Dataflow streaming ETL pipeline to ClickHouse
Benchmark ClickHouse on Google Axion processors
Next Steps
In this section you’ll implement a real-time streaming ETL pipeline that ingests events from Pub/Sub, processes them using Dataflow (Apache Beam), and writes them into ClickHouse running on a GCP Axion (Arm64) VM.
Pub/Sub → Dataflow (Apache Beam) → ClickHouse (Axion VM)
Key components:
Install Python 3.11 and the required system packages:
sudo zypper refresh
sudo zypper install -y python311 python311-pip python311-devel gcc gcc-c++
Verify installation:
python3.11 --version
pip3.11 --version
Using a virtual environment avoids dependency conflicts with the system Python.
python3.11 -m venv beam-venv
source beam-venv/bin/activate
Install Apache Beam and the required dependencies for Dataflow:
pip install --upgrade pip
pip install "apache-beam[gcp]"
pip install requests
Verify Beam installation:
python -c "import apache_beam; print(apache_beam.__version__)"
Connect to ClickHouse on the Axion VM:
clickhouse client
Create the target database and table for streaming inserts:
CREATE DATABASE IF NOT EXISTS realtime;
CREATE TABLE IF NOT EXISTS realtime.logs
(
event_time DateTime,
service String,
level String,
message String
)
ENGINE = MergeTree
ORDER BY event_time;
Verify the table:
SHOW TABLES FROM realtime;
Query id: aa25de9d-c07f-4538-803f-5473744631bc
┌─name─┐
1. │ logs │
└──────┘
1 row in set. Elapsed: 0.001 sec.
Exit the client:
exit;
Before running Dataflow, confirm that messages can be published and pulled.
Publish a test message:
gcloud pubsub topics publish logs-topic \
--message '{"event_time":"2025-12-30 12:00:00","service":"api","level":"INFO","message":"PRE-DATAFLOW TEST"}'
Pull the message:
gcloud pubsub subscriptions pull logs-sub --limit=1 --auto-ack
┌───────────────────────────────────────────────────────────────────────────────────────────────────┬───────────────────┬──────────────┬────────────┬──────────────────┬────────────┐
│ DATA │ MESSAGE_ID │ ORDERING_KEY │ ATTRIBUTES │ DELIVERY_ATTEMPT │ ACK_STATUS │
├───────────────────────────────────────────────────────────────────────────────────────────────────┼───────────────────┼──────────────┼────────────┼──────────────────┼────────────┤
│ {"event_time":"2025-12-30 12:00:00","service":"api","level":"INFO","message":"PRE-DATAFLOW TEST"} │ 17590032987110080 │ │ │ │ SUCCESS │
└───────────────────────────────────────────────────────────────────────────────────────────────────┴───────────────────┴──────────────┴────────────┴──────────────────┴────────────┘
Successful output confirms:
Create the Dataflow pipeline file that defines a streaming Beam pipeline to:
vi dataflow_etl.py
Paste the following production-ready streaming pipeline:
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
PROJECT_ID = "<GCP_PROJECT_ID>"
SUBSCRIPTION = "projects/<GCP_PROJECT_ID>/subscriptions/<PUBSUB_SUBSCRIPTION_NAME>"
CLICKHOUSE_URL = "projects/<GCP_PROJECT_ID>/subscriptions/<PUBSUB_SUBSCRIPTION_NAME>"
class ParseMessage(beam.DoFn):
def process(self, element):
yield json.loads(element.decode("utf-8"))
class WriteToClickHouse(beam.DoFn):
def process(self, element):
import requests
row = (
f"{element['event_time']}\t"
f"{element['service']}\t"
f"{element['level']}\t"
f"{element['message']}\n"
)
requests.post(
CLICKHOUSE_URL,
data=row,
headers={"Content-Type": "text/tab-separated-values"},
params={"query": "INSERT INTO realtime.logs FORMAT TabSeparated"}
)
options = PipelineOptions(
streaming=True,
save_main_session=True
)
with beam.Pipeline(options=options) as p:
(
p
| "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION)
| "Parse JSON" >> beam.ParDo(ParseMessage())
| "Write to ClickHouse" >> beam.ParDo(WriteToClickHouse())
)
Pipeline logic:
Replace <GCP_PROJECT_ID>, <PUBSUB_SUBSCRIPTION_NAME>, and <CLICKHOUSE_INTERNAL_IP> with your existing GCP project ID, Pub/Sub subscription name, and the internal IP address of your ClickHouse VM.
Below are the exact commands you can run from your VM to get each required value:
gcloud config get-value project
gcloud pubsub subscriptions list
hostname -I
Launch the pipeline on managed Dataflow workers:
python3.11 dataflow_etl.py \
--runner=DataflowRunner \
--project=<GCP_PROJECT_ID> \
--region=<DATAFLOW_REGION> \
--temp_location=gs://<GCS_TEMP_BUCKET>/temp \
--streaming
<GCP_PROJECT_ID> – Your Google Cloud project ID (e.g. my-project-123)<DATAFLOW_REGION> – Region where Dataflow runs (e.g. us-central1)<GCS_TEMP_BUCKET> – Existing GCS bucket used for Dataflow temp files
Autoscaling is enabled for Dataflow Streaming Engine. Workers will scale between 1 and 100 unless maxNumWorkers is specified.
This indicates:
Publish live streaming data:
gcloud pubsub topics publish logs-topic \
--message '{"event_time":"2025-12-30 13:30:00","service":"api","level":"INFO","message":"FRESH DATAFLOW WORKING"}'
Verify data in ClickHouse:
SELECT *
FROM realtime.logs
ORDER BY event_time DESC
LIMIT 5;
Output:
SELECT *
FROM realtime.logs
ORDER BY event_time DESC
LIMIT 5
Query id: 74a105d0-2e04-4053-825c-d30e53424d14
┌──────────event_time─┬─service───┬─level─┬─message────────────────┐
1. │ 2025-12-30 13:30:00 │ api │ INFO │ FRESH DATAFLOW WORKING │
2. │ 2025-12-30 13:00:00 │ api │ INFO │ DATAFLOW FINAL SUCCESS │
3. │ 2025-12-30 12:45:00 │ api │ INFO │ FINAL DATAFLOW SUCCESS │
4. │ 2025-12-30 08:48:35 │ service-0 │ INFO │ benchmark message 0 │
5. │ 2025-12-30 08:48:34 │ service-1 │ INFO │ benchmark message 1 │
└─────────────────────┴───────────┴───────┴────────────────────────┘
This confirms:
You’ve successfully built a complete streaming ETL pipeline connecting Google Cloud Pub/Sub to ClickHouse running on an Arm-based Axion VM. The pipeline processes real-time events through Dataflow and stores them in ClickHouse for analytics.
This pipeline serves as the foundation for ClickHouse latency benchmarking and real-time analytics on Google Axion, which you’ll perform in the next section.