Introduction
When wanting to migrate from the big red to PostgreSQL, most of the time you can afford the downtime of the export/import process and starting from something fresh. It is simple and reliable. Ora2pg being one of the go-to tools for that. But sometimes, you can afford the downtime, either because the database is critical for business operations or either because the DB is to big to run the export/import process.
Hence the following example of using “Logical replication” between Oracle and PostgreSQL using Flink CDC. I call it like that even though it is a even stream process because for DBAs it will have roughly the same limitations and constraints as standard logical replication.
Here is the layout :
Oracle Source → Flink CDC → JDBC Sink → PostgreSQL Target
This approach is based on production experience migrating large Oracle databases, where we achieved throughput of 19,500 records per second—a 65x improvement over our initial baseline. But more importantly, it transformed a “big bang” migration event into a controlled, observable, and recoverable process.
The geek in me says that Flink CDC is a powerful tool for migrations. The consultant says it should not be used blindly—it’s relevant for specific use cases where the benefits outweigh the operational complexity.
What Each Piece Does
- Oracle (source): The source database. Flink CDC reads directly from tables via JDBC for snapshot mode, or from redo logs for streaming CDC mode.
- Flink CDC source (Oracle): A Flink table that wraps the Debezium Oracle connector. It reads data and turns it into a changelog stream (insert/update/delete). Key options control snapshot mode, parallelism, and fetch sizes.
- Flink runtime: Runs a streaming job that:
- Snapshot: Reads current table state, optionally in parallel chunks
- Checkpoints: State is stored so restarts resume exactly from the last acknowledged point
- Transforms: You can filter, project, cast types, and even aggregate in Flink SQL
- JDBC sink (PostgreSQL): Another Flink table. With a PRIMARY KEY defined, the connector performs UPSERT semantics (
INSERT ... ON CONFLICT DO UPDATEin PostgreSQL). It writes in batches, flushes on checkpoints, and retries on transient errors. - PostgreSQL (target): Receives the stream and ends up with the migrated data. With proper tuning (especially
rewriteBatchedInserts=true), it can handle high throughput.
Flink and Debezium: How CDC Works
Flink CDC connectors use Debezium which is an open-source platform for Change Data Capture that captures row-level changes in databases by reading transaction logs.
┌───────────────────────────────────────────────────────────────────────┐
│ Flink CDC Architecture │
│ │
│ ┌──────────────┐ ┌────────────────────────────────┐ ┌────────┐ │
│ │ Oracle │ │ Flink CDC Connector │ │ Sink │ │
│ │ Database │ │ ┌─────────────────────────┐ │ │Database│ │
│ │ │───►│ │ Debezium (embedded) │ │───►│ │ │
│ │ • Redo logs │ │ │ ───────────────── │ │ │ Post- │ │
│ │ • Tables │ │ │ • Oracle connector │ │ │ greSQL│ │
│ │ │ │ │ • Log parsing │ │ │ │ │
│ └──────────────┘ │ │ • Event streaming │ │ └────────┘ │
│ │ └─────────────────────────┘ │ │
│ └────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────────────┘
Why Debezium?
- Log-based CDC: Reads database transaction logs, not polling tables—much lower overhead
- Low impact: Minimal performance hit on source database
- Exactly-once delivery: When combined with Flink’s checkpointing
- Schema tracking: Handles schema evolution in streaming scenarios
Snapshot vs. CDC Modes
When you configure a Flink CDC source, you can choose:
- Snapshot Only: Read current table state (what we use in this demo)—fastest for one-time migrations
- Snapshot + CDC: Initial snapshot, then stream ongoing changes—for zero-downtime migrations
- CDC Only: Stream only new changes (requires existing snapshot)
Note : Snapshot itself can be done with in one transaction (can be long for big tables) or using incremental snapshot. Since I am using an Oracle express edition for this demo I will stick with the normal Snapshot. In case having big tables to load standard/enterprise editions are required for supplemental logs.
Anatomy of a Flink SQL Pipeline
A Flink SQL migration pipeline has four distinct parts. Understanding each part is critical for troubleshooting and optimization.
Part 1: Runtime Configuration (SET Statements)
These settings control how the Flink job executes. Think of them as the “knobs” you turn to tune behavior:
-- Pipeline identification
SET 'pipeline.name' = 'Oracle-to-PostgreSQL: CUSTOMERS Migration';
-- Runtime mode: STREAMING for CDC, BATCH for one-time loads
SET 'execution.runtime-mode' = 'STREAMING';
-- Parallelism: how many workers process data concurrently
SET 'parallelism.default' = '4';
-- Checkpointing: how often Flink saves progress for recovery
SET 'execution.checkpointing.mode' = 'AT_LEAST_ONCE';
SET 'execution.checkpointing.interval' = '60 s';
SET 'execution.checkpointing.timeout' = '10 min';
SET 'execution.checkpointing.min-pause' = '30 s';
-- Restart strategy: what happens on failure
SET 'restart-strategy.type' = 'fixed-delay';
SET 'restart-strategy.fixed-delay.attempts' = '3';
SET 'restart-strategy.fixed-delay.delay' = '10 s';
Key points:
AT_LEAST_ONCEis faster thanEXACTLY_ONCEfor snapshot migrations where idempotency is guaranteed by upserts- Checkpoint interval affects both recovery granularity and overhead
- Higher parallelism isn’t always better—you can hit contention on the target
Part 2: Source Table Definition (Oracle CDC)
This defines how Flink reads from Oracle. The column definitions must match your Oracle schema, using Flink SQL types:
DROP TABLE IF EXISTS src_customers;
CREATE TABLE src_customers (
-- Column definitions must match Oracle schema
-- Use Flink SQL types that map to Oracle types
CUSTOMER_ID DECIMAL(10,0),
FIRST_NAME STRING,
LAST_NAME STRING,
EMAIL STRING,
PHONE STRING,
CREATED_AT TIMESTAMP(6),
STATUS STRING,
-- Primary key is required for CDC (NOT ENFORCED = Flink won't validate)
PRIMARY KEY (CUSTOMER_ID) NOT ENFORCED
) WITH (
-- Connector type: oracle-cdc (uses Debezium internally)
'connector' = 'oracle-cdc',
-- Oracle connection details
'hostname' = 'oracle',
'port' = '1521',
'username' = 'demo',
'password' = 'demo',
-- Database configuration (pluggable database for Oracle XE)
-- Use url to connect via service name instead of SID
'url' = 'jdbc:oracle:thin:@//oracle:1521/XEPDB1',
'database-name' = 'XEPDB1',
'schema-name' = 'DEMO',
'table-name' = 'CUSTOMERS',
-- Snapshot mode: 'initial' = full snapshot, then stop (for snapshot-only)
'scan.startup.mode' = 'initial',
-- IMPORTANT: Disable incremental snapshot for this demo
-- Incremental snapshot requires additional Oracle configuration
'scan.incremental.snapshot.enabled' = 'false',
-- Debezium snapshot configuration
'debezium.snapshot.mode' = 'initial',
'debezium.snapshot.fetch.size' = '10000'
);
Key concepts:
- PRIMARY KEY NOT ENFORCED: Tells Flink the key exists but it won’t validate uniqueness
scan.incremental.snapshot.enabled: Set tofalsefor simple snapshots;truerequires Oracle archive log mode and supplemental loggingdebezium.snapshot.fetch.size: How many rows to fetch per database round-trip—larger = fewer round-trips
Part 3: Sink Table Definition (PostgreSQL JDBC)
This defines how Flink writes to PostgreSQL:
DROP TABLE IF EXISTS sink_customers;
CREATE TABLE sink_customers (
-- Column definitions for PostgreSQL target
customer_id BIGINT,
first_name STRING,
last_name STRING,
email STRING,
phone STRING,
created_at TIMESTAMP(6),
status STRING,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
-- Connector type: jdbc
'connector' = 'jdbc',
-- PostgreSQL connection with batch optimization
-- rewriteBatchedInserts=true is CRITICAL for performance (5-10x improvement)
'url' = 'jdbc:postgresql://postgres:5432/demo?rewriteBatchedInserts=true',
'table-name' = 'customers',
'username' = 'demo',
'password' = 'demo',
'driver' = 'org.postgresql.Driver',
-- Sink parallelism (tune based on target DB capacity)
-- Too high can cause contention; 4-8 is usually optimal
'sink.parallelism' = '4',
-- Buffer configuration for throughput
'sink.buffer-flush.max-rows' = '10000',
'sink.buffer-flush.interval' = '5 s',
-- Retry configuration
'sink.max-retries' = '3'
);
Key optimization: rewriteBatchedInserts=true is critical for PostgreSQL performance. This tells the JDBC driver to rewrite individual INSERT statements into a single multi-row INSERT:
Without this:
INSERT INTO t (a,b) VALUES (1,'x');
INSERT INTO t (a,b) VALUES (2,'y');
INSERT INTO t (a,b) VALUES (3,'z');
With rewriteBatchedInserts=true:
INSERT INTO t (a,b) VALUES (1,'x'),(2,'y'),(3,'z');
This single change gave us a 5-10x throughput improvement in production.
Part 4: Data Flow (INSERT…SELECT)
This starts the actual data migration. The CAST operations convert Oracle types to PostgreSQL types:
INSERT INTO sink_customers
SELECT
CAST(CUSTOMER_ID AS BIGINT) AS customer_id,
FIRST_NAME AS first_name,
LAST_NAME AS last_name,
EMAIL AS email,
PHONE AS phone,
CREATED_AT AS created_at,
STATUS AS status
FROM src_customers;
This single statement:
- Reads from the Oracle source table
- Transforms data types (CAST operations)
- Writes to the PostgreSQL sink table
- Handles batching, parallelism, and fault tolerance automatically
Complete SQL Example
Here is the complete migration pipeline that you can run in Flink SQL Client. This is production-ready code with all the optimizations we’ve discussed:
-- =============================================================================
-- Flink CDC Pipeline: Migrate CUSTOMERS table (Oracle -> PostgreSQL)
-- =============================================================================
-- Mode: Snapshot-only (no incremental, no streaming)
-- Source: Oracle XE 21c
-- Target: PostgreSQL 18
-- =============================================================================
-- ============================================================================
-- PART 1: Runtime Configuration (SET statements)
-- ============================================================================
-- These settings control how the Flink job executes
SET 'pipeline.name' = 'Oracle-to-PostgreSQL: CUSTOMERS Migration';
SET 'execution.runtime-mode' = 'STREAMING';
SET 'parallelism.default' = '4';
-- Checkpointing configuration
-- AT_LEAST_ONCE is faster for snapshot/migration workloads
SET 'execution.checkpointing.mode' = 'AT_LEAST_ONCE';
SET 'execution.checkpointing.interval' = '60 s';
SET 'execution.checkpointing.timeout' = '10 min';
SET 'execution.checkpointing.min-pause' = '30 s';
-- Restart strategy for fault tolerance
SET 'restart-strategy.type' = 'fixed-delay';
SET 'restart-strategy.fixed-delay.attempts' = '3';
SET 'restart-strategy.fixed-delay.delay' = '10 s';
-- ============================================================================
-- PART 2: Source Table Definition (Oracle CDC)
-- ============================================================================
-- This defines how Flink reads from Oracle using Debezium under the hood
DROP TABLE IF EXISTS src_customers;
CREATE TABLE src_customers (
-- Column definitions must match Oracle schema
-- Use Flink SQL types that map to Oracle types
CUSTOMER_ID DECIMAL(10,0),
FIRST_NAME STRING,
LAST_NAME STRING,
EMAIL STRING,
PHONE STRING,
CREATED_AT TIMESTAMP(6),
STATUS STRING,
-- Primary key is required for CDC (NOT ENFORCED = Flink won't validate)
PRIMARY KEY (CUSTOMER_ID) NOT ENFORCED
) WITH (
-- Connector type: oracle-cdc (uses Debezium internally)
'connector' = 'oracle-cdc',
-- Oracle connection details
'hostname' = 'oracle',
'port' = '1521',
'username' = 'demo',
'password' = 'demo',
-- Database configuration (pluggable database for Oracle XE)
-- Use url to connect via service name instead of SID
'url' = 'jdbc:oracle:thin:@//oracle:1521/XEPDB1',
'database-name' = 'XEPDB1',
'schema-name' = 'DEMO',
'table-name' = 'CUSTOMERS',
-- Snapshot mode: 'initial' = full snapshot, then stop (for snapshot-only)
'scan.startup.mode' = 'initial',
-- IMPORTANT: Disable incremental snapshot for this demo
-- Incremental snapshot requires additional Oracle configuration
'scan.incremental.snapshot.enabled' = 'false',
-- Debezium snapshot configuration
'debezium.snapshot.fetch.size' = '10000'
);
-- ============================================================================
-- PART 3: Sink Table Definition (PostgreSQL JDBC)
-- ============================================================================
-- This defines how Flink writes to PostgreSQL
DROP TABLE IF EXISTS sink_customers;
CREATE TABLE sink_customers (
-- Column definitions for PostgreSQL target
customer_id BIGINT,
first_name STRING,
last_name STRING,
email STRING,
phone STRING,
created_at TIMESTAMP(6),
status STRING,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
-- Connector type: jdbc
'connector' = 'jdbc',
-- PostgreSQL connection with batch optimization
-- rewriteBatchedInserts=true is CRITICAL for performance (5-10x improvement)
'url' = 'jdbc:postgresql://postgres:5432/demo?rewriteBatchedInserts=true',
'table-name' = 'customers',
'username' = 'demo',
'password' = 'demo',
'driver' = 'org.postgresql.Driver',
-- Sink parallelism (tune based on target DB capacity)
-- Too high can cause contention; 4-8 is usually optimal
'sink.parallelism' = '4',
-- Buffer configuration for throughput
'sink.buffer-flush.max-rows' = '10000',
'sink.buffer-flush.interval' = '5 s',
-- Retry configuration
'sink.max-retries' = '3'
);
-- ============================================================================
-- PART 4: Data Flow (INSERT...SELECT)
-- ============================================================================
-- This starts the actual data migration
-- CAST operations convert Oracle types to PostgreSQL types
INSERT INTO sink_customers
SELECT
CAST(CUSTOMER_ID AS BIGINT) AS customer_id,
FIRST_NAME AS first_name,
LAST_NAME AS last_name,
EMAIL AS email,
PHONE AS phone,
CREATED_AT AS created_at,
STATUS AS status
FROM src_customers;
LAB Setup
In my LAB I am using PG18 and Oracle XE Docker container and the Flink task and job manager container with the follwing definition :
Create a docker-compose.yml:
services:
# Oracle Database 21c XE (Source)
oracle:
image: gvenzl/oracle-xe:21-slim-faststart
container_name: oracle-demo
environment:
ORACLE_PASSWORD: OracleDemo123
APP_USER: demo
APP_USER_PASSWORD: demo
ports:
- "1521:1521"
volumes:
- oracle-data:/opt/oracle/oradata
- ./oracle-init:/container-entrypoint-initdb.d
healthcheck:
test: ["CMD", "healthcheck.sh"]
interval: 30s
timeout: 10s
retries: 10
start_period: 120s
networks:
- flink-network
# PostgreSQL 18 (Target)
postgres:
image: postgres:18
container_name: postgres-demo
environment:
POSTGRES_USER: demo
POSTGRES_PASSWORD: demo
POSTGRES_DB: demo
ports:
- "5432:5432"
volumes:
- postgres-data:/var/lib/postgresql
- ./postgres-init:/docker-entrypoint-initdb.d
healthcheck:
test: ["CMD-SHELL", "pg_isready -U demo -d demo"]
interval: 10s
timeout: 5s
retries: 5
networks:
- flink-network
# Flink Job Manager
flink-jobmanager:
build:
context: ./flink
dockerfile: Dockerfile
container_name: flink-jobmanager
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
jobmanager.memory.process.size: 1600m
parallelism.default: 4
state.backend.type: rocksdb
volumes:
- ./pipelines:/opt/flink/pipelines
networks:
- flink-network
depends_on:
oracle:
condition: service_healthy
postgres:
condition: service_healthy
# Flink Task Manager
flink-taskmanager:
build:
context: ./flink
dockerfile: Dockerfile
container_name: flink-taskmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
taskmanager.memory.process.size: 2048m
taskmanager.numberOfTaskSlots: 8
networks:
- flink-network
depends_on:
- flink-jobmanager
volumes:
oracle-data:
postgres-data:
networks:
flink-network:
driver: bridge
Create flink/Dockerfile:
FROM flink:1.20.3-scala_2.12-java11
# Download Flink CDC connector for Oracle
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-oracle-cdc/3.5.0/flink-sql-connector-oracle-cdc-3.5.0.jar
# Download JDBC connector
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar
# Download PostgreSQL JDBC driver
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.4/postgresql-42.7.4.jar
# Download Oracle JDBC driver
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc11/23.5.0.24.07/ojdbc11-23.5.0.24.07.jar
Access the Flink Web UI at: http://localhost:8081
Running the Migration
Let’s execute the actual migration with full command outputs.
Step 1: Verify Source Data (Before Migration)
$ docker exec oracle-demo bash -c "echo 'SELECT COUNT(*) FROM customers;' | \
sqlplus -s demo/demo@//localhost:1521/XEPDB1"
COUNT(*)
----------
10000
Step 2: Verify Target is Empty (Before Migration)
$ docker exec postgres-demo psql -U demo -d demo -c "SELECT COUNT(*) FROM customers;"
count
-------
0
(1 row)
Step 3: Run the Migration Pipeline
$ docker exec flink-jobmanager /opt/flink/bin/sql-client.sh \
-f /opt/flink/pipelines/migrate-customers.sql
[INFO] Executing SQL from file.
Flink SQL> SET 'pipeline.name' = 'Oracle-to-PostgreSQL: CUSTOMERS Migration';
[INFO] Execute statement succeeded.
...
Flink SQL> INSERT INTO sink_customers SELECT ...
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c554d99dce69b084607080502c13ffca
Step 4: Monitor Progress
Check the Flink Web UI at http://localhost:8081 or use the REST API:
$ curl -s http://localhost:8081/jobs | jq '.jobs[] | select(.status == "RUNNING" or .status == "FINISHED")'
{
"id": "c554d99dce69b084607080502c13ffca",
"status": "FINISHED"
}
Step 5: Verify Migration (After)
$ docker exec postgres-demo psql -U demo -d demo -c "SELECT COUNT(*) FROM customers;"
count
-------
10000
(1 row)
$ docker exec postgres-demo psql -U demo -d demo -c "SELECT * FROM customers LIMIT 3;"
customer_id | first_name | last_name | email | phone
-------------+---------------+--------------+--------------------------+-------------
8836 | FirstName8836 | LastName8836 | [email protected] | +1-555-8836
4740 | FirstName4740 | LastName4740 | [email protected] | +1-555-4740
8835 | FirstName8835 | LastName8835 | [email protected] | +1-555-8835
(3 rows)
Type Mapping Reference
Before migrating data, you need to understand the type conversions. Here’s a reference:
| Oracle | Flink SQL | PostgreSQL | Notes |
|---|---|---|---|
| NUMBER(10) | DECIMAL(10,0) | BIGINT | Use CAST in SELECT |
| NUMBER(12,2) | DECIMAL(12,2) | NUMERIC(12,2) | Direct mapping |
| VARCHAR2(n) | STRING | VARCHAR(n) | Direct mapping |
| DATE | TIMESTAMP(6) | TIMESTAMP | Oracle DATE includes time |
| TIMESTAMP | TIMESTAMP(6) | TIMESTAMP | Direct mapping |
| CLOB | STRING | TEXT | Large text |
| BLOB | BYTES | BYTEA | Binary data |
Performance Optimization: What We Learned
Based on production experience, here are the key optimizations that improved throughput from 300 rec/sec to 19,500 rec/sec (65x improvement).
Understanding CPU-Bound vs. IOPS-Bound Pipelines
Before tuning, you need to understand what’s limiting your pipeline. This is critical because the solutions are different:
CPU-Bound Pipeline:
- Symptoms: High CPU usage on Flink Task Manager, low disk I/O on target database
- Cause: Complex transformations, serialization/deserialization overhead, too few parallel workers
- Solution: Increase parallelism, simplify transformations, use more Task Manager slots
IOPS-Bound Pipeline:
- Symptoms: Low CPU usage on Flink, high disk I/O or lock contention on target database
- Cause: Too many small writes, target database bottleneck, excessive parallelism causing lock contention
- Solution: Larger batch sizes,
rewriteBatchedInserts=true, reduce sink parallelism, tune target database
Network-Bound Pipeline:
- Symptoms: High network wait times, gaps between source reads and sink writes
- Cause: Small fetch sizes, high latency between Flink and databases
- Solution: Larger fetch sizes, co-locate components when possible
How to Identify Your Bottleneck
In the Flink Web UI, look at:
- Backpressure indicators: Red/yellow backpressure on source = sink can’t keep up (IOPS-bound)
- Records sent/received: Compare source output rate vs. sink input rate
- Checkpoint duration: Long checkpoints often indicate IOPS issues on state backend
- Task Manager metrics: CPU%, memory usage, GC pauses
On your databases:
# Oracle: Check redo log generation rate
SELECT * FROM V$SYSSTAT WHERE NAME LIKE '%redo%';
# PostgreSQL: Check write activity
SELECT * FROM pg_stat_bgwriter;
SELECT * FROM pg_stat_database WHERE datname = 'demo';
Critical Optimizations
1. JDBC Batch Rewriting (5-10x Improvement)
The single most impactful optimization for IOPS-bound pipelines:
'url' = 'jdbc:postgresql://host/db?rewriteBatchedInserts=true'
This is so important I’ll repeat it: this single parameter gave us 5-10x throughput improvement. Without it, every row is a separate INSERT statement. With it, rows are batched into efficient multi-row INSERTs.
2. Sink Parallelism (2-4x Improvement)
More workers can process more data—but there’s a sweet spot:
'sink.parallelism' = '12'
Our testing showed:
| Parallelism | Throughput | Notes |
|---|---|---|
| 1 | 5,000 rec/sec | Baseline |
| 4 | 12,000 rec/sec | Good improvement |
| 8 | 17,000 rec/sec | Still scaling |
| 12 | 19,500 rec/sec | Sweet spot |
| 16 | 18,000 rec/sec | Contention starts |
| 24 | 15,000 rec/sec | Too much contention |
Why does too much parallelism hurt? Lock contention on the target database. Each parallel writer tries to acquire locks, and beyond a certain point, they spend more time waiting than writing.
3. Buffer Size Tuning
Larger buffers = fewer flushes = better throughput (at cost of memory and latency):
'sink.buffer-flush.max-rows' = '50000'
'sink.buffer-flush.interval' = '10 s'
For IOPS-bound pipelines, larger buffers are critical. For CPU-bound pipelines, smaller buffers with higher parallelism may be better.
4. Source Fetch Size
Reduce round-trips to the source database:
-- For JDBC connector:
'scan.fetch-size' = '20000'
-- For CDC connector:
'debezium.snapshot.fetch.size' = '20000'
Larger fetch sizes reduce network overhead but increase memory usage. Find your balance based on available memory.
5. Checkpointing Mode
For migrations (where exactly-once is less critical):
SET 'execution.checkpointing.mode' = 'AT_LEAST_ONCE';
AT_LEAST_ONCE is faster than EXACTLY_ONCE because it doesn’t require barriers to align data across all parallel paths. Since our sink uses upserts (INSERT ON CONFLICT), duplicate processing is idempotent anyway.
6. Checkpoint Interval
Longer intervals = less overhead, but longer recovery time on failure:
SET 'execution.checkpointing.interval' = '60 s';
For our production migrations, 45-60 seconds was optimal. Shorter intervals caused excessive state backend I/O (another IOPS consideration).
Performance Reference
| Setting | Baseline | Optimized | Impact |
|---|---|---|---|
| rewriteBatchedInserts | false | true | 5-10x |
| sink.parallelism | 1 | 12 | 2-4x |
| buffer-flush.max-rows | 1000 | 50000 | 1.5-2x |
| fetch-size | 1000 | 20000 | 1.3-1.5x |
| checkpoint.mode | EXACTLY_ONCE | AT_LEAST_ONCE | 1.2-1.3x |
| Combined Throughput | 300 rec/sec | 19,500 rec/sec | 65x |
Real-World Tuning Process
Here’s how I approach tuning a new migration:
- Start with defaults: Run the pipeline and observe behavior in Flink UI
- Identify the bottleneck: Is it CPU, IOPS, or network?
- Apply the biggest lever first: Usually
rewriteBatchedInserts=truefor PostgreSQL - Increase parallelism gradually: Watch for the point where throughput stops improving
- Tune batch sizes: Larger for IOPS-bound, smaller for CPU-bound
- Monitor the target database: Watch for lock contention, checkpoint lag, WAL accumulation
- Document your findings: Each environment is different; what works for one may not work for another
Incremental Snapshot for Large Databases
For databases larger than ~100GB, incremental snapshot mode is essential. Instead of reading entire tables at once (which can cause locks and memory issues), incremental snapshot divides tables into chunks.
What is Incremental Snapshot?
┌─────────────────────────────────────────────────────────────────┐
│ Incremental Snapshot │
│ │
│ Table (1M rows, chunked by ID): │
│ │
│ ┌───────┬───────┬───────┬───────┬───────┐ │
│ │ Chunk │ Chunk │ Chunk │ Chunk │ Chunk │ │
│ │ 1 │ 2 │ 3 │ 4 │ 5 │ ... │
│ │ 1-200K│200K- │400K- │600K- │800K- │ │
│ │ │ 400K │ 600K │ 800K │ 1M │ │
│ └───┬───┴───┬───┴───┬───┴───┬───┴───┬───┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ Process in parallel, no table locks │
│ │
└─────────────────────────────────────────────────────────────────┘
Oracle Requirements
Incremental snapshot with CDC requires additional Oracle configuration:
- Archive Log Mode: Must be enabled
-- Check current mode SELECT LOG_MODE FROM V$DATABASE; -- Enable (requires DB restart) SHUTDOWN IMMEDIATE; STARTUP MOUNT; ALTER DATABASE ARCHIVELOG; ALTER DATABASE OPEN; - Supplemental Logging:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS; - LogMiner Privileges for the CDC user:
GRANT SELECT ON V_$DATABASE TO cdc_user; GRANT SELECT ON V_$LOG TO cdc_user; GRANT SELECT ON V_$LOGFILE TO cdc_user; GRANT SELECT ON V_$ARCHIVED_LOG TO cdc_user; GRANT EXECUTE ON DBMS_LOGMNR TO cdc_user; GRANT EXECUTE ON DBMS_LOGMNR_D TO cdc_user; GRANT SELECT ON V_$LOGMNR_LOGS TO cdc_user; GRANT SELECT ON V_$LOGMNR_CONTENTS TO cdc_user; GRANT FLASHBACK ANY TABLE TO cdc_user;
Pipeline Configuration
CREATE TABLE src_large_table (...) WITH (
'connector' = 'oracle-cdc',
'url' = 'jdbc:oracle:thin:@//oracle:1521/XEPDB1',
'database-name' = 'XEPDB1',
'schema-name' = 'DEMO',
'table-name' = 'LARGE_TABLE',
-- Enable incremental snapshot
'scan.incremental.snapshot.enabled' = 'true',
'scan.incremental.snapshot.chunk.size' = '100000',
'scan.incremental.snapshot.chunk.key-column' = 'ID',
-- Debezium settings
'debezium.snapshot.fetch.size' = '20000'
);
When to Use Incremental Snapshot
| Database Size | Recommendation |
|---|---|
| < 10 GB | Standard snapshot (JDBC) |
| 10-100 GB | Either approach works |
| > 100 GB | Incremental snapshot required |
| Active production DB | Incremental snapshot recommended |
Production Implementation Advice
Before taking this approach to production, there are several considerations to keep in mind. First, this lab setup runs Flink in standalone mode which is fine for testing but lacks persistence—if you restart the Flink processes, your pipelines are lost. For production, you’ll want to deploy on Kubernetes using the official Flink Kubernetes Operator, which provides proper state management, automatic recovery, and horizontal scaling. Second, pay close attention to version compatibility because not all latest versions of Flink, CDC connectors, and JDBC drivers work together—I learned this the hard way, so check the compatibility matrix before building your stack and stick with LTS versions like Flink 1.20 when possible. Third, externalize your checkpoints to durable storage like S3, MinIO, or HDFS rather than local filesystem, as this enables true fault tolerance and job recovery across restarts. Fourth, implement proper monitoring by connecting Flink’s metrics to Prometheus and Grafana, setting up alerts for checkpoint failures, backpressure, and throughput drops—the Web UI is great for debugging but not for 24/7 operations. Fifth, secure your connections by using SSL/TLS for database connections, storing credentials in a secrets manager rather than plain text in SQL files, and implementing network segmentation between Flink and your databases. Finally, if your organization allows it, seriously consider managed services like AWS Managed Flink, Confluent Cloud, or Azure Stream Analytics, which eliminate most of the operational burden of running Flink clusters yourself. The official documentation provides comprehensive guidance for production deployments: Apache Flink CDC Introduction.
As per example, in a migration project for an Oracle database of 800GB, around 1500 tables and 4.8 Billions rows the VM that hosted the Flink services was 16 cores and 48GB of RAM. The initial incremental snapshot lasted for 3.5 days with a throughput of 18 000 records/sec and 15k IOPS. Several automation had to be created like how to generate the pipelines for all tables and how to sequentially go from the initial load to the streaming part while maintaining the CPU cores busy.
What We’ve Learned
Through this guide, we’ve explored database migration with Flink CDC and learned several important lessons. On the technical side, start simple with snapshot mode first and add complexity like incremental or streaming CDC only when needed—don’t overengineer for a one-time migration. Understanding your bottleneck is critical because the tuning strategy differs completely depending on whether your pipeline is CPU-bound, IOPS-bound, or network-bound. The rewriteBatchedInserts=true parameter is magic for PostgreSQL, giving us a 5-10x improvement with a single setting, and parallelism has a sweet spot where more isn’t always better—we found 12 workers optimal before lock contention started hurting performance. Checkpointing is a trade-off between throughput and recovery time, with 45-60 seconds being optimal for migrations, and type mapping matters because incorrect Oracle → Flink → PostgreSQL conversions cause silent data corruption. Operationally, monitor everything using the Flink Web UI alongside source and target database metrics, test thoroughly on a test environment first because production surprises are expensive, have a rollback plan by keeping the source database running until cutover is verified, and document your tuning because each environment is different. Strategically, know when NOT to use Flink since simpler tools are better for small databases or same-technology migrations, factor in the operational complexity of maintaining another system, and consider cloud-managed Flink/CDC solutions if your organization allows it.
Conclusion
Flink CDC transforms database migrations from anxious “big bang” events into controlled, observable, and recoverable processes by combining real-time monitoring in the Flink Web UI, fault tolerance through checkpointing, configurable parallelism for performance, and transform capabilities in Flink SQL—making it a powerful tool for cross-technology migrations. We achieved a 65x throughput improvement (300 → 19,500 rec/sec) by understanding our bottlenecks and applying targeted optimizations, with the key insight being to identify whether you’re CPU-bound or IOPS-bound and tune accordingly. As with any tool, use it where it fits: for large, cross-technology migrations with near zero-downtime requirements, Flink CDC is excellent, but for small databases or simple same-technology copies, stick with native tools.