Introduction

When wanting to migrate from the big red to PostgreSQL, most of the time you can afford the downtime of the export/import process and starting from something fresh. It is simple and reliable. Ora2pg being one of the go-to tools for that. But sometimes, you can afford the downtime, either because the database is critical for business operations or either because the DB is to big to run the export/import process.
Hence the following example of using “Logical replication” between Oracle and PostgreSQL using Flink CDC. I call it like that even though it is a even stream process because for DBAs it will have roughly the same limitations and constraints as standard logical replication.

Here is the layout :

Oracle Source → Flink CDC → JDBC Sink → PostgreSQL Target

This approach is based on production experience migrating large Oracle databases, where we achieved throughput of 19,500 records per second—a 65x improvement over our initial baseline. But more importantly, it transformed a “big bang” migration event into a controlled, observable, and recoverable process.

The geek in me says that Flink CDC is a powerful tool for migrations. The consultant says it should not be used blindly—it’s relevant for specific use cases where the benefits outweigh the operational complexity.


What Each Piece Does

  • Oracle (source): The source database. Flink CDC reads directly from tables via JDBC for snapshot mode, or from redo logs for streaming CDC mode.
  • Flink CDC source (Oracle): A Flink table that wraps the Debezium Oracle connector. It reads data and turns it into a changelog stream (insert/update/delete). Key options control snapshot mode, parallelism, and fetch sizes.
  • Flink runtime: Runs a streaming job that:
    • Snapshot: Reads current table state, optionally in parallel chunks
    • Checkpoints: State is stored so restarts resume exactly from the last acknowledged point
    • Transforms: You can filter, project, cast types, and even aggregate in Flink SQL
  • JDBC sink (PostgreSQL): Another Flink table. With a PRIMARY KEY defined, the connector performs UPSERT semantics (INSERT ... ON CONFLICT DO UPDATE in PostgreSQL). It writes in batches, flushes on checkpoints, and retries on transient errors.
  • PostgreSQL (target): Receives the stream and ends up with the migrated data. With proper tuning (especially rewriteBatchedInserts=true), it can handle high throughput.

Flink CDC connectors use Debezium which is an open-source platform for Change Data Capture that captures row-level changes in databases by reading transaction logs.

┌───────────────────────────────────────────────────────────────────────┐
│                        Flink CDC Architecture                         │
│                                                                       │
│  ┌──────────────┐    ┌────────────────────────────────┐    ┌────────┐ │
│  │    Oracle    │    │      Flink CDC Connector       │    │  Sink  │ │
│  │   Database   │    │  ┌─────────────────────────┐   │    │Database│ │
│  │              │───►│  │   Debezium (embedded)   │   │───►│        │ │
│  │  • Redo logs │    │  │   ─────────────────     │   │    │  Post- │ │
│  │  • Tables    │    │  │   • Oracle connector    │   │    │  greSQL│ │
│  │              │    │  │   • Log parsing         │   │    │        │ │
│  └──────────────┘    │  │   • Event streaming     │   │    └────────┘ │
│                      │  └─────────────────────────┘   │               │
│                      └────────────────────────────────┘               │
└───────────────────────────────────────────────────────────────────────┘

Why Debezium?

  • Log-based CDC: Reads database transaction logs, not polling tables—much lower overhead
  • Low impact: Minimal performance hit on source database
  • Exactly-once delivery: When combined with Flink’s checkpointing
  • Schema tracking: Handles schema evolution in streaming scenarios

Snapshot vs. CDC Modes

When you configure a Flink CDC source, you can choose:

  1. Snapshot Only: Read current table state (what we use in this demo)—fastest for one-time migrations
  2. Snapshot + CDC: Initial snapshot, then stream ongoing changes—for zero-downtime migrations
  3. CDC Only: Stream only new changes (requires existing snapshot)

    Note : Snapshot itself can be done with in one transaction (can be long for big tables) or using incremental snapshot. Since I am using an Oracle express edition for this demo I will stick with the normal Snapshot. In case having big tables to load standard/enterprise editions are required for supplemental logs.

A Flink SQL migration pipeline has four distinct parts. Understanding each part is critical for troubleshooting and optimization.

Part 1: Runtime Configuration (SET Statements)

These settings control how the Flink job executes. Think of them as the “knobs” you turn to tune behavior:

-- Pipeline identification
SET 'pipeline.name' = 'Oracle-to-PostgreSQL: CUSTOMERS Migration';

-- Runtime mode: STREAMING for CDC, BATCH for one-time loads
SET 'execution.runtime-mode' = 'STREAMING';

-- Parallelism: how many workers process data concurrently
SET 'parallelism.default' = '4';

-- Checkpointing: how often Flink saves progress for recovery
SET 'execution.checkpointing.mode' = 'AT_LEAST_ONCE';
SET 'execution.checkpointing.interval' = '60 s';
SET 'execution.checkpointing.timeout' = '10 min';
SET 'execution.checkpointing.min-pause' = '30 s';

-- Restart strategy: what happens on failure
SET 'restart-strategy.type' = 'fixed-delay';
SET 'restart-strategy.fixed-delay.attempts' = '3';
SET 'restart-strategy.fixed-delay.delay' = '10 s';

Key points:

  • AT_LEAST_ONCE is faster than EXACTLY_ONCE for snapshot migrations where idempotency is guaranteed by upserts
  • Checkpoint interval affects both recovery granularity and overhead
  • Higher parallelism isn’t always better—you can hit contention on the target

Part 2: Source Table Definition (Oracle CDC)

This defines how Flink reads from Oracle. The column definitions must match your Oracle schema, using Flink SQL types:

DROP TABLE IF EXISTS src_customers;

CREATE TABLE src_customers (
    -- Column definitions must match Oracle schema
    -- Use Flink SQL types that map to Oracle types
    CUSTOMER_ID   DECIMAL(10,0),
    FIRST_NAME    STRING,
    LAST_NAME     STRING,
    EMAIL         STRING,
    PHONE         STRING,
    CREATED_AT    TIMESTAMP(6),
    STATUS        STRING,
    -- Primary key is required for CDC (NOT ENFORCED = Flink won't validate)
    PRIMARY KEY (CUSTOMER_ID) NOT ENFORCED
) WITH (
    -- Connector type: oracle-cdc (uses Debezium internally)
    'connector' = 'oracle-cdc',

    -- Oracle connection details
    'hostname' = 'oracle',
    'port' = '1521',
    'username' = 'demo',
    'password' = 'demo',

    -- Database configuration (pluggable database for Oracle XE)
    -- Use url to connect via service name instead of SID
    'url' = 'jdbc:oracle:thin:@//oracle:1521/XEPDB1',
    'database-name' = 'XEPDB1',
    'schema-name' = 'DEMO',
    'table-name' = 'CUSTOMERS',

    -- Snapshot mode: 'initial' = full snapshot, then stop (for snapshot-only)
    'scan.startup.mode' = 'initial',

    -- IMPORTANT: Disable incremental snapshot for this demo
    -- Incremental snapshot requires additional Oracle configuration
    'scan.incremental.snapshot.enabled' = 'false',

    -- Debezium snapshot configuration
    'debezium.snapshot.mode' = 'initial',
    'debezium.snapshot.fetch.size' = '10000'
);

Key concepts:

  • PRIMARY KEY NOT ENFORCED: Tells Flink the key exists but it won’t validate uniqueness
  • scan.incremental.snapshot.enabled: Set to false for simple snapshots; true requires Oracle archive log mode and supplemental logging
  • debezium.snapshot.fetch.size: How many rows to fetch per database round-trip—larger = fewer round-trips

Part 3: Sink Table Definition (PostgreSQL JDBC)

This defines how Flink writes to PostgreSQL:

DROP TABLE IF EXISTS sink_customers;

CREATE TABLE sink_customers (
    -- Column definitions for PostgreSQL target
    customer_id   BIGINT,
    first_name    STRING,
    last_name     STRING,
    email         STRING,
    phone         STRING,
    created_at    TIMESTAMP(6),
    status        STRING,
    PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
    -- Connector type: jdbc
    'connector' = 'jdbc',

    -- PostgreSQL connection with batch optimization
    -- rewriteBatchedInserts=true is CRITICAL for performance (5-10x improvement)
    'url' = 'jdbc:postgresql://postgres:5432/demo?rewriteBatchedInserts=true',
    'table-name' = 'customers',
    'username' = 'demo',
    'password' = 'demo',
    'driver' = 'org.postgresql.Driver',

    -- Sink parallelism (tune based on target DB capacity)
    -- Too high can cause contention; 4-8 is usually optimal
    'sink.parallelism' = '4',

    -- Buffer configuration for throughput
    'sink.buffer-flush.max-rows' = '10000',
    'sink.buffer-flush.interval' = '5 s',

    -- Retry configuration
    'sink.max-retries' = '3'
);

Key optimization: rewriteBatchedInserts=true is critical for PostgreSQL performance. This tells the JDBC driver to rewrite individual INSERT statements into a single multi-row INSERT:

Without this:

INSERT INTO t (a,b) VALUES (1,'x');
INSERT INTO t (a,b) VALUES (2,'y');
INSERT INTO t (a,b) VALUES (3,'z');

With rewriteBatchedInserts=true:

INSERT INTO t (a,b) VALUES (1,'x'),(2,'y'),(3,'z');

This single change gave us a 5-10x throughput improvement in production.

Part 4: Data Flow (INSERT…SELECT)

This starts the actual data migration. The CAST operations convert Oracle types to PostgreSQL types:

INSERT INTO sink_customers
SELECT
    CAST(CUSTOMER_ID AS BIGINT) AS customer_id,
    FIRST_NAME AS first_name,
    LAST_NAME AS last_name,
    EMAIL AS email,
    PHONE AS phone,
    CREATED_AT AS created_at,
    STATUS AS status
FROM src_customers;

This single statement:

  1. Reads from the Oracle source table
  2. Transforms data types (CAST operations)
  3. Writes to the PostgreSQL sink table
  4. Handles batching, parallelism, and fault tolerance automatically

Complete SQL Example

Here is the complete migration pipeline that you can run in Flink SQL Client. This is production-ready code with all the optimizations we’ve discussed:

-- =============================================================================
-- Flink CDC Pipeline: Migrate CUSTOMERS table (Oracle -> PostgreSQL)
-- =============================================================================
-- Mode: Snapshot-only (no incremental, no streaming)
-- Source: Oracle XE 21c
-- Target: PostgreSQL 18
-- =============================================================================

-- ============================================================================
-- PART 1: Runtime Configuration (SET statements)
-- ============================================================================
-- These settings control how the Flink job executes

SET 'pipeline.name' = 'Oracle-to-PostgreSQL: CUSTOMERS Migration';
SET 'execution.runtime-mode' = 'STREAMING';
SET 'parallelism.default' = '4';

-- Checkpointing configuration
-- AT_LEAST_ONCE is faster for snapshot/migration workloads
SET 'execution.checkpointing.mode' = 'AT_LEAST_ONCE';
SET 'execution.checkpointing.interval' = '60 s';
SET 'execution.checkpointing.timeout' = '10 min';
SET 'execution.checkpointing.min-pause' = '30 s';

-- Restart strategy for fault tolerance
SET 'restart-strategy.type' = 'fixed-delay';
SET 'restart-strategy.fixed-delay.attempts' = '3';
SET 'restart-strategy.fixed-delay.delay' = '10 s';

-- ============================================================================
-- PART 2: Source Table Definition (Oracle CDC)
-- ============================================================================
-- This defines how Flink reads from Oracle using Debezium under the hood

DROP TABLE IF EXISTS src_customers;

CREATE TABLE src_customers (
    -- Column definitions must match Oracle schema
    -- Use Flink SQL types that map to Oracle types
    CUSTOMER_ID   DECIMAL(10,0),
    FIRST_NAME    STRING,
    LAST_NAME     STRING,
    EMAIL         STRING,
    PHONE         STRING,
    CREATED_AT    TIMESTAMP(6),
    STATUS        STRING,
    -- Primary key is required for CDC (NOT ENFORCED = Flink won't validate)
    PRIMARY KEY (CUSTOMER_ID) NOT ENFORCED
) WITH (
    -- Connector type: oracle-cdc (uses Debezium internally)
    'connector' = 'oracle-cdc',

    -- Oracle connection details
    'hostname' = 'oracle',
    'port' = '1521',
    'username' = 'demo',
    'password' = 'demo',

    -- Database configuration (pluggable database for Oracle XE)
    -- Use url to connect via service name instead of SID
    'url' = 'jdbc:oracle:thin:@//oracle:1521/XEPDB1',
    'database-name' = 'XEPDB1',
    'schema-name' = 'DEMO',
    'table-name' = 'CUSTOMERS',

    -- Snapshot mode: 'initial' = full snapshot, then stop (for snapshot-only)
    'scan.startup.mode' = 'initial',

    -- IMPORTANT: Disable incremental snapshot for this demo
    -- Incremental snapshot requires additional Oracle configuration
    'scan.incremental.snapshot.enabled' = 'false',

    -- Debezium snapshot configuration
    'debezium.snapshot.fetch.size' = '10000'
);

-- ============================================================================
-- PART 3: Sink Table Definition (PostgreSQL JDBC)
-- ============================================================================
-- This defines how Flink writes to PostgreSQL

DROP TABLE IF EXISTS sink_customers;

CREATE TABLE sink_customers (
    -- Column definitions for PostgreSQL target
    customer_id   BIGINT,
    first_name    STRING,
    last_name     STRING,
    email         STRING,
    phone         STRING,
    created_at    TIMESTAMP(6),
    status        STRING,
    PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
    -- Connector type: jdbc
    'connector' = 'jdbc',

    -- PostgreSQL connection with batch optimization
    -- rewriteBatchedInserts=true is CRITICAL for performance (5-10x improvement)
    'url' = 'jdbc:postgresql://postgres:5432/demo?rewriteBatchedInserts=true',
    'table-name' = 'customers',
    'username' = 'demo',
    'password' = 'demo',
    'driver' = 'org.postgresql.Driver',

    -- Sink parallelism (tune based on target DB capacity)
    -- Too high can cause contention; 4-8 is usually optimal
    'sink.parallelism' = '4',

    -- Buffer configuration for throughput
    'sink.buffer-flush.max-rows' = '10000',
    'sink.buffer-flush.interval' = '5 s',

    -- Retry configuration
    'sink.max-retries' = '3'
);

-- ============================================================================
-- PART 4: Data Flow (INSERT...SELECT)
-- ============================================================================
-- This starts the actual data migration
-- CAST operations convert Oracle types to PostgreSQL types

INSERT INTO sink_customers
SELECT
    CAST(CUSTOMER_ID AS BIGINT) AS customer_id,
    FIRST_NAME AS first_name,
    LAST_NAME AS last_name,
    EMAIL AS email,
    PHONE AS phone,
    CREATED_AT AS created_at,
    STATUS AS status
FROM src_customers;

LAB Setup

In my LAB I am using PG18 and Oracle XE Docker container and the Flink task and job manager container with the follwing definition :

Create a docker-compose.yml:

services:
  # Oracle Database 21c XE (Source)
  oracle:
    image: gvenzl/oracle-xe:21-slim-faststart
    container_name: oracle-demo
    environment:
      ORACLE_PASSWORD: OracleDemo123
      APP_USER: demo
      APP_USER_PASSWORD: demo
    ports:
      - "1521:1521"
    volumes:
      - oracle-data:/opt/oracle/oradata
      - ./oracle-init:/container-entrypoint-initdb.d
    healthcheck:
      test: ["CMD", "healthcheck.sh"]
      interval: 30s
      timeout: 10s
      retries: 10
      start_period: 120s
    networks:
      - flink-network

  # PostgreSQL 18 (Target)
  postgres:
    image: postgres:18
    container_name: postgres-demo
    environment:
      POSTGRES_USER: demo
      POSTGRES_PASSWORD: demo
      POSTGRES_DB: demo
    ports:
      - "5432:5432"
    volumes:
      - postgres-data:/var/lib/postgresql
      - ./postgres-init:/docker-entrypoint-initdb.d
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U demo -d demo"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - flink-network

  # Flink Job Manager
  flink-jobmanager:
    build:
      context: ./flink
      dockerfile: Dockerfile
    container_name: flink-jobmanager
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
        jobmanager.memory.process.size: 1600m
        parallelism.default: 4
        state.backend.type: rocksdb
    volumes:
      - ./pipelines:/opt/flink/pipelines
    networks:
      - flink-network
    depends_on:
      oracle:
        condition: service_healthy
      postgres:
        condition: service_healthy

  # Flink Task Manager
  flink-taskmanager:
    build:
      context: ./flink
      dockerfile: Dockerfile
    container_name: flink-taskmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.memory.process.size: 2048m
        taskmanager.numberOfTaskSlots: 8
    networks:
      - flink-network
    depends_on:
      - flink-jobmanager

volumes:
  oracle-data:
  postgres-data:

networks:
  flink-network:
    driver: bridge

Create flink/Dockerfile:

FROM flink:1.20.3-scala_2.12-java11

# Download Flink CDC connector for Oracle
RUN wget -P /opt/flink/lib/ \
    https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-oracle-cdc/3.5.0/flink-sql-connector-oracle-cdc-3.5.0.jar

# Download JDBC connector
RUN wget -P /opt/flink/lib/ \
    https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar

# Download PostgreSQL JDBC driver
RUN wget -P /opt/flink/lib/ \
    https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.4/postgresql-42.7.4.jar

# Download Oracle JDBC driver
RUN wget -P /opt/flink/lib/ \
    https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc11/23.5.0.24.07/ojdbc11-23.5.0.24.07.jar

Access the Flink Web UI at: http://localhost:8081


Running the Migration

Let’s execute the actual migration with full command outputs.

Step 1: Verify Source Data (Before Migration)

$ docker exec oracle-demo bash -c "echo 'SELECT COUNT(*) FROM customers;' | \
    sqlplus -s demo/demo@//localhost:1521/XEPDB1"

  COUNT(*)
----------
     10000

Step 2: Verify Target is Empty (Before Migration)

$ docker exec postgres-demo psql -U demo -d demo -c "SELECT COUNT(*) FROM customers;"

 count
-------
     0
(1 row)

Step 3: Run the Migration Pipeline

$ docker exec flink-jobmanager /opt/flink/bin/sql-client.sh \
    -f /opt/flink/pipelines/migrate-customers.sql

[INFO] Executing SQL from file.
Flink SQL> SET 'pipeline.name' = 'Oracle-to-PostgreSQL: CUSTOMERS Migration';
[INFO] Execute statement succeeded.
...
Flink SQL> INSERT INTO sink_customers SELECT ...
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c554d99dce69b084607080502c13ffca

Step 4: Monitor Progress

Check the Flink Web UI at http://localhost:8081 or use the REST API:

$ curl -s http://localhost:8081/jobs | jq '.jobs[] | select(.status == "RUNNING" or .status == "FINISHED")'
{
  "id": "c554d99dce69b084607080502c13ffca",
  "status": "FINISHED"
}

Step 5: Verify Migration (After)

$ docker exec postgres-demo psql -U demo -d demo -c "SELECT COUNT(*) FROM customers;"

 count
-------
 10000
(1 row)

$ docker exec postgres-demo psql -U demo -d demo -c "SELECT * FROM customers LIMIT 3;"

 customer_id |  first_name   |  last_name   |          email           |    phone
-------------+---------------+--------------+--------------------------+-------------
        8836 | FirstName8836 | LastName8836 | [email protected] | +1-555-8836
        4740 | FirstName4740 | LastName4740 | [email protected] | +1-555-4740
        8835 | FirstName8835 | LastName8835 | [email protected] | +1-555-8835
(3 rows)

Type Mapping Reference

Before migrating data, you need to understand the type conversions. Here’s a reference:

OracleFlink SQLPostgreSQLNotes
NUMBER(10)DECIMAL(10,0)BIGINTUse CAST in SELECT
NUMBER(12,2)DECIMAL(12,2)NUMERIC(12,2)Direct mapping
VARCHAR2(n)STRINGVARCHAR(n)Direct mapping
DATETIMESTAMP(6)TIMESTAMPOracle DATE includes time
TIMESTAMPTIMESTAMP(6)TIMESTAMPDirect mapping
CLOBSTRINGTEXTLarge text
BLOBBYTESBYTEABinary data

Performance Optimization: What We Learned

Based on production experience, here are the key optimizations that improved throughput from 300 rec/sec to 19,500 rec/sec (65x improvement).

Understanding CPU-Bound vs. IOPS-Bound Pipelines

Before tuning, you need to understand what’s limiting your pipeline. This is critical because the solutions are different:

CPU-Bound Pipeline:

  • Symptoms: High CPU usage on Flink Task Manager, low disk I/O on target database
  • Cause: Complex transformations, serialization/deserialization overhead, too few parallel workers
  • Solution: Increase parallelism, simplify transformations, use more Task Manager slots

IOPS-Bound Pipeline:

  • Symptoms: Low CPU usage on Flink, high disk I/O or lock contention on target database
  • Cause: Too many small writes, target database bottleneck, excessive parallelism causing lock contention
  • Solution: Larger batch sizes, rewriteBatchedInserts=true, reduce sink parallelism, tune target database

Network-Bound Pipeline:

  • Symptoms: High network wait times, gaps between source reads and sink writes
  • Cause: Small fetch sizes, high latency between Flink and databases
  • Solution: Larger fetch sizes, co-locate components when possible

How to Identify Your Bottleneck

In the Flink Web UI, look at:

  1. Backpressure indicators: Red/yellow backpressure on source = sink can’t keep up (IOPS-bound)
  2. Records sent/received: Compare source output rate vs. sink input rate
  3. Checkpoint duration: Long checkpoints often indicate IOPS issues on state backend
  4. Task Manager metrics: CPU%, memory usage, GC pauses

On your databases:

# Oracle: Check redo log generation rate
SELECT * FROM V$SYSSTAT WHERE NAME LIKE '%redo%';

# PostgreSQL: Check write activity
SELECT * FROM pg_stat_bgwriter;
SELECT * FROM pg_stat_database WHERE datname = 'demo';

Critical Optimizations

1. JDBC Batch Rewriting (5-10x Improvement)

The single most impactful optimization for IOPS-bound pipelines:

'url' = 'jdbc:postgresql://host/db?rewriteBatchedInserts=true'

This is so important I’ll repeat it: this single parameter gave us 5-10x throughput improvement. Without it, every row is a separate INSERT statement. With it, rows are batched into efficient multi-row INSERTs.

2. Sink Parallelism (2-4x Improvement)

More workers can process more data—but there’s a sweet spot:

'sink.parallelism' = '12'

Our testing showed:

ParallelismThroughputNotes
15,000 rec/secBaseline
412,000 rec/secGood improvement
817,000 rec/secStill scaling
1219,500 rec/secSweet spot
1618,000 rec/secContention starts
2415,000 rec/secToo much contention

Why does too much parallelism hurt? Lock contention on the target database. Each parallel writer tries to acquire locks, and beyond a certain point, they spend more time waiting than writing.

3. Buffer Size Tuning

Larger buffers = fewer flushes = better throughput (at cost of memory and latency):

'sink.buffer-flush.max-rows' = '50000'
'sink.buffer-flush.interval' = '10 s'

For IOPS-bound pipelines, larger buffers are critical. For CPU-bound pipelines, smaller buffers with higher parallelism may be better.

4. Source Fetch Size

Reduce round-trips to the source database:

-- For JDBC connector:
'scan.fetch-size' = '20000'

-- For CDC connector:
'debezium.snapshot.fetch.size' = '20000'

Larger fetch sizes reduce network overhead but increase memory usage. Find your balance based on available memory.

5. Checkpointing Mode

For migrations (where exactly-once is less critical):

SET 'execution.checkpointing.mode' = 'AT_LEAST_ONCE';

AT_LEAST_ONCE is faster than EXACTLY_ONCE because it doesn’t require barriers to align data across all parallel paths. Since our sink uses upserts (INSERT ON CONFLICT), duplicate processing is idempotent anyway.

6. Checkpoint Interval

Longer intervals = less overhead, but longer recovery time on failure:

SET 'execution.checkpointing.interval' = '60 s';

For our production migrations, 45-60 seconds was optimal. Shorter intervals caused excessive state backend I/O (another IOPS consideration).

Performance Reference

SettingBaselineOptimizedImpact
rewriteBatchedInsertsfalsetrue5-10x
sink.parallelism1122-4x
buffer-flush.max-rows1000500001.5-2x
fetch-size1000200001.3-1.5x
checkpoint.modeEXACTLY_ONCEAT_LEAST_ONCE1.2-1.3x
Combined Throughput300 rec/sec19,500 rec/sec65x

Real-World Tuning Process

Here’s how I approach tuning a new migration:

  1. Start with defaults: Run the pipeline and observe behavior in Flink UI
  2. Identify the bottleneck: Is it CPU, IOPS, or network?
  3. Apply the biggest lever first: Usually rewriteBatchedInserts=true for PostgreSQL
  4. Increase parallelism gradually: Watch for the point where throughput stops improving
  5. Tune batch sizes: Larger for IOPS-bound, smaller for CPU-bound
  6. Monitor the target database: Watch for lock contention, checkpoint lag, WAL accumulation
  7. Document your findings: Each environment is different; what works for one may not work for another

Incremental Snapshot for Large Databases

For databases larger than ~100GB, incremental snapshot mode is essential. Instead of reading entire tables at once (which can cause locks and memory issues), incremental snapshot divides tables into chunks.

What is Incremental Snapshot?

┌─────────────────────────────────────────────────────────────────┐
│                   Incremental Snapshot                          │
│                                                                 │
│  Table (1M rows, chunked by ID):                                │
│                                                                 │
│  ┌───────┬───────┬───────┬───────┬───────┐                      │
│  │ Chunk │ Chunk │ Chunk │ Chunk │ Chunk │                      │
│  │  1    │  2    │  3    │  4    │  5    │  ...                 │
│  │ 1-200K│200K-  │400K-  │600K-  │800K-  │                      │
│  │       │ 400K  │ 600K  │ 800K  │ 1M    │                      │
│  └───┬───┴───┬───┴───┬───┴───┬───┴───┬───┘                      │
│      │       │       │       │                                  │
│      ▼       ▼       ▼       ▼                                  │
│   Process in parallel, no table locks                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Oracle Requirements

Incremental snapshot with CDC requires additional Oracle configuration:

  1. Archive Log Mode: Must be enabled -- Check current mode SELECT LOG_MODE FROM V$DATABASE; -- Enable (requires DB restart) SHUTDOWN IMMEDIATE; STARTUP MOUNT; ALTER DATABASE ARCHIVELOG; ALTER DATABASE OPEN;
  2. Supplemental Logging: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
  3. LogMiner Privileges for the CDC user: GRANT SELECT ON V_$DATABASE TO cdc_user; GRANT SELECT ON V_$LOG TO cdc_user; GRANT SELECT ON V_$LOGFILE TO cdc_user; GRANT SELECT ON V_$ARCHIVED_LOG TO cdc_user; GRANT EXECUTE ON DBMS_LOGMNR TO cdc_user; GRANT EXECUTE ON DBMS_LOGMNR_D TO cdc_user; GRANT SELECT ON V_$LOGMNR_LOGS TO cdc_user; GRANT SELECT ON V_$LOGMNR_CONTENTS TO cdc_user; GRANT FLASHBACK ANY TABLE TO cdc_user;

Pipeline Configuration

CREATE TABLE src_large_table (...) WITH (
    'connector' = 'oracle-cdc',
    'url' = 'jdbc:oracle:thin:@//oracle:1521/XEPDB1',
    'database-name' = 'XEPDB1',
    'schema-name' = 'DEMO',
    'table-name' = 'LARGE_TABLE',

    -- Enable incremental snapshot
    'scan.incremental.snapshot.enabled' = 'true',
    'scan.incremental.snapshot.chunk.size' = '100000',
    'scan.incremental.snapshot.chunk.key-column' = 'ID',

    -- Debezium settings
    'debezium.snapshot.fetch.size' = '20000'
);

When to Use Incremental Snapshot

Database SizeRecommendation
< 10 GBStandard snapshot (JDBC)
10-100 GBEither approach works
> 100 GBIncremental snapshot required
Active production DBIncremental snapshot recommended

Production Implementation Advice

Before taking this approach to production, there are several considerations to keep in mind. First, this lab setup runs Flink in standalone mode which is fine for testing but lacks persistence—if you restart the Flink processes, your pipelines are lost. For production, you’ll want to deploy on Kubernetes using the official Flink Kubernetes Operator, which provides proper state management, automatic recovery, and horizontal scaling. Second, pay close attention to version compatibility because not all latest versions of Flink, CDC connectors, and JDBC drivers work together—I learned this the hard way, so check the compatibility matrix before building your stack and stick with LTS versions like Flink 1.20 when possible. Third, externalize your checkpoints to durable storage like S3, MinIO, or HDFS rather than local filesystem, as this enables true fault tolerance and job recovery across restarts. Fourth, implement proper monitoring by connecting Flink’s metrics to Prometheus and Grafana, setting up alerts for checkpoint failures, backpressure, and throughput drops—the Web UI is great for debugging but not for 24/7 operations. Fifth, secure your connections by using SSL/TLS for database connections, storing credentials in a secrets manager rather than plain text in SQL files, and implementing network segmentation between Flink and your databases. Finally, if your organization allows it, seriously consider managed services like AWS Managed Flink, Confluent Cloud, or Azure Stream Analytics, which eliminate most of the operational burden of running Flink clusters yourself. The official documentation provides comprehensive guidance for production deployments: Apache Flink CDC Introduction.

As per example, in a migration project for an Oracle database of 800GB, around 1500 tables and 4.8 Billions rows the VM that hosted the Flink services was 16 cores and 48GB of RAM. The initial incremental snapshot lasted for 3.5 days with a throughput of 18 000 records/sec and 15k IOPS. Several automation had to be created like how to generate the pipelines for all tables and how to sequentially go from the initial load to the streaming part while maintaining the CPU cores busy.


What We’ve Learned

Through this guide, we’ve explored database migration with Flink CDC and learned several important lessons. On the technical side, start simple with snapshot mode first and add complexity like incremental or streaming CDC only when needed—don’t overengineer for a one-time migration. Understanding your bottleneck is critical because the tuning strategy differs completely depending on whether your pipeline is CPU-bound, IOPS-bound, or network-bound. The rewriteBatchedInserts=true parameter is magic for PostgreSQL, giving us a 5-10x improvement with a single setting, and parallelism has a sweet spot where more isn’t always better—we found 12 workers optimal before lock contention started hurting performance. Checkpointing is a trade-off between throughput and recovery time, with 45-60 seconds being optimal for migrations, and type mapping matters because incorrect Oracle → Flink → PostgreSQL conversions cause silent data corruption. Operationally, monitor everything using the Flink Web UI alongside source and target database metrics, test thoroughly on a test environment first because production surprises are expensive, have a rollback plan by keeping the source database running until cutover is verified, and document your tuning because each environment is different. Strategically, know when NOT to use Flink since simpler tools are better for small databases or same-technology migrations, factor in the operational complexity of maintaining another system, and consider cloud-managed Flink/CDC solutions if your organization allows it.

Conclusion

Flink CDC transforms database migrations from anxious “big bang” events into controlled, observable, and recoverable processes by combining real-time monitoring in the Flink Web UI, fault tolerance through checkpointing, configurable parallelism for performance, and transform capabilities in Flink SQL—making it a powerful tool for cross-technology migrations. We achieved a 65x throughput improvement (300 → 19,500 rec/sec) by understanding our bottlenecks and applying targeted optimizations, with the key insight being to identify whether you’re CPU-bound or IOPS-bound and tune accordingly. As with any tool, use it where it fits: for large, cross-technology migrations with near zero-downtime requirements, Flink CDC is excellent, but for small databases or simple same-technology copies, stick with native tools.


Resources