Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2 from judahrand/replica-read
Browse files Browse the repository at this point in the history
Add use_replica config flag
  • Loading branch information
judahrand authored Jan 18, 2022
2 parents 9cf0839 + d2d9aa6 commit 595813b
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 150 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ jobs:

- name: Tests
env:
TAP_POSTGRES_HOST: localhost
TAP_POSTGRES_PORT: 5432
TAP_POSTGRES_USER: test_user
TAP_POSTGRES_PASSWORD: my-secret-passwd
TAP_POSTGRES_HOST: localhost
TAP_POSTGRES_SECONDARY_HOST: localhost
TAP_POSTGRES_SECONDARY_PORT: 5433
LOGGING_CONF_FILE: ./sample_logging.conf
run: make test
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pylint:
pylint --rcfile .pylintrc --disable duplicate-code tap_postgres/

start_db:
docker-compose up -d --build db
docker-compose up -d

test:
. ./venv/bin/activate ;\
Expand Down
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ Full list of options in `config.json`:
| tap_id | String | No | ID of the pipeline/tap (Default: None) |
| itersize | Integer | No | Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE (Default: 20000) |
| default_replication_method | String | No | Default replication method to use when no one is provided in the catalog (Values: `LOG_BASED`, `INCREMENTAL` or `FULL_TABLE`) (Default: None) |
| use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) |
| secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) |
| secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) |


### Run the tap in Discovery Mode
Expand Down Expand Up @@ -142,7 +145,7 @@ to the tap for the next sync.
```
Restart your PostgreSQL service to ensure the changes take effect.
**Note**: For `max_replication_slots` and `max_wal_senders`, we’re defaulting to a value of 5.
This should be sufficient unless you have a large number of read replicas connected to the master instance.
Expand All @@ -151,11 +154,11 @@ to the tap for the next sync.
In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a
client in the order they were made on the original server. Each slot streams a sequence of changes from a single
database.
Login to the master instance as a superuser and using the `wal2json` plugin, create a logical replication slot:
```
SELECT *
FROM pg_create_logical_replication_slot('pipelinewise_<database_name>', 'wal2json');
FROM pg_create_logical_replication_slot('pipelinewise_<database_name>', 'wal2json');
```
**Note**: Replication slots are specific to a given database in a cluster. If you want to connect multiple
Expand All @@ -172,6 +175,8 @@ to the tap for the next sync.
```
export TAP_POSTGRES_HOST=<postgres-host>
export TAP_POSTGRES_PORT=<postgres-port>
export TAP_POSTGRES_SECONDARY_HOST=<postgres-replica-host>
export TAP_POSTGRES_SECONDARY_PORT=<postgres-replica-port>
export TAP_POSTGRES_USER=<postgres-user>
export TAP_POSTGRES_PASSWORD=<postgres-password>
```
Expand Down
28 changes: 23 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
version: "3.3"

services:
db:
image: "debezium/postgres:12-alpine"
container_name: ""
db_primary:
image: "docker.io/bitnami/postgresql:12"
container_name: "primary"
ports:
- "5432:5432"
environment:
- POSTGRESQL_REPLICATION_MODE=master
- POSTGRESQL_REPLICATION_USER=repl_user
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
- POSTGRES_USER=test_user
- POSTGRES_PASSWORD=my-secret-passwd
- POSTGRES_DB=tap_postgres_test
command: -c "wal_level=logical" -c "max_replication_slots=5" -c "max_wal_senders=5"
- POSTGRESQL_POSTGRES_PASSWORD=my-secret-passwd
- POSTGRESQL_DATABASE=tap_postgres_test
- ALLOW_EMPTY_PASSWORD=yes
db_replica:
image: "docker.io/bitnami/postgresql:12"
container_name: replica
ports:
- "5433:5432"
depends_on:
- db_primary
environment:
- POSTGRESQL_REPLICATION_MODE=slave
- POSTGRESQL_REPLICATION_USER=repl_user
- POSTGRESQL_REPLICATION_PASSWORD=repl_password
- POSTGRESQL_MASTER_HOST=db_primary
- POSTGRESQL_MASTER_PORT_NUMBER=5432
- ALLOW_EMPTY_PASSWORD=yes
15 changes: 14 additions & 1 deletion tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,22 @@ def main_impl():
'debug_lsn': args.config.get('debug_lsn') == 'true',
'max_run_seconds': args.config.get('max_run_seconds', 43200),
'break_at_end_lsn': args.config.get('break_at_end_lsn', True),
'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0))
'logical_poll_total_seconds': float(args.config.get('logical_poll_total_seconds', 0)),
'use_secondary': args.config.get('use_secondary', False),
}

if conn_config['use_secondary']:
try:
conn_config.update({
# Host and Port are mandatory.
'secondary_host': args.config['secondary_host'],
'secondary_port': args.config['secondary_port'],
})
except KeyError as exc:
raise ValueError(
"When 'use_secondary' enabled 'secondary_host' and 'secondary_port' must be defined."
) from exc

if args.config.get('ssl') == 'true':
conn_config['sslmode'] = 'require'

Expand Down
10 changes: 9 additions & 1 deletion tap_postgres/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def fully_qualified_table_name(schema, table):
return f'"{canonicalize_identifier(schema)}"."{canonicalize_identifier(table)}"'


def open_connection(conn_config, logical_replication=False):
def open_connection(conn_config, logical_replication=False, prioritize_primary=False):
cfg = {
'application_name': 'pipelinewise',
'host': conn_config['host'],
Expand All @@ -47,6 +47,14 @@ def open_connection(conn_config, logical_replication=False):
'connect_timeout': 30
}

if conn_config.get('use_secondary', False) and not prioritize_primary and not logical_replication:
# Try to use replica but fallback to primary if keys are missing. This is the same behavior as
# https://github.com/transferwise/pipelinewise/blob/master/pipelinewise/fastsync/commons/tap_postgres.py#L129
cfg.update({
'host': conn_config.get("secondary_host", conn_config['host']),
'port': conn_config.get("secondary_port", conn_config['port']),
})

if conn_config.get('sslmode'):
cfg['sslmode'] = conn_config['sslmode']

Expand Down
12 changes: 6 additions & 6 deletions tap_postgres/sync_strategies/logical_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class UnsupportedPayloadKindError(Exception):

# pylint: disable=invalid-name,missing-function-docstring,too-many-branches,too-many-statements,too-many-arguments
def get_pg_version(conn_info):
with post_db.open_connection(conn_info, False) as conn:
with post_db.open_connection(conn_info, False, True) as conn:
with conn.cursor() as cur:
cur.execute("SELECT setting::int AS version FROM pg_settings WHERE name='server_version_num'")
version = cur.fetchone()[0]
Expand Down Expand Up @@ -93,7 +93,7 @@ def fetch_current_lsn(conn_config):
if version < 90400:
raise Exception('Logical replication not supported before PostgreSQL 9.4')

with post_db.open_connection(conn_config, False) as conn:
with post_db.open_connection(conn_config, False, True) as conn:
with conn.cursor() as cur:
# Use version specific lsn command
if version >= 100000:
Expand Down Expand Up @@ -138,7 +138,7 @@ def create_hstore_elem_query(elem):


def create_hstore_elem(conn_info, elem):
with post_db.open_connection(conn_info) as conn:
with post_db.open_connection(conn_info, False, True) as conn:
with conn.cursor() as cur:
query = create_hstore_elem_query(elem)
cur.execute(query)
Expand All @@ -151,7 +151,7 @@ def create_array_elem(elem, sql_datatype, conn_info):
if elem is None:
return None

with post_db.open_connection(conn_info) as conn:
with post_db.open_connection(conn_info, False, True) as conn:
with conn.cursor() as cur:
if sql_datatype == 'bit[]':
cast_datatype = 'boolean[]'
Expand Down Expand Up @@ -517,7 +517,7 @@ def locate_replication_slot_by_cur(cursor, dbname, tap_id=None):


def locate_replication_slot(conn_info):
with post_db.open_connection(conn_info, False) as conn:
with post_db.open_connection(conn_info, False, True) as conn:
with conn.cursor() as cur:
return locate_replication_slot_by_cur(cur, conn_info['dbname'], conn_info['tap_id'])

Expand Down Expand Up @@ -576,7 +576,7 @@ def sync_tables(conn_info, logical_streams, state, end_lsn, state_file):
version = get_pg_version(conn_info)

# Create replication connection and cursor
conn = post_db.open_connection(conn_info, True)
conn = post_db.open_connection(conn_info, True, True)
cur = conn.cursor()

# Set session wal_sender_timeout for PG12 and above
Expand Down
12 changes: 6 additions & 6 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def setUp(self):
table_spec = {"columns": [{"name" : 'our_pk', "type" : "hstore", "primary_key" : True },
{"name" : 'our_hstore', "type" : "hstore" }],
"name" : TestHStoreTable.table_name}
with get_test_connection() as conn:
with get_test_connection(superuser=True) as conn:
cur = conn.cursor()
cur.execute(""" SELECT installed_version FROM pg_available_extensions WHERE name = 'hstore' """)
if cur.fetchone()[0] is None:
Expand Down Expand Up @@ -536,7 +536,7 @@ class TestColumnGrants(unittest.TestCase):
table_name = 'CHICKEN TIMES'
user = 'tmp_user_for_grant_tests'
password = 'password'

def setUp(self):
table_spec = {"columns": [{"name" : "id", "type" : "integer", "serial" : True},
{"name" : 'size integer', "type" : "integer", "quoted" : True},
Expand All @@ -545,7 +545,7 @@ def setUp(self):
"name" : TestColumnGrants.table_name}
ensure_test_table(table_spec)

with get_test_connection() as conn:
with get_test_connection(superuser=True) as conn:
cur = conn.cursor()

sql = """ DROP USER IF EXISTS {} """.format(self.user, self.password)
Expand All @@ -560,8 +560,8 @@ def setUp(self):
LOGGER.info("running sql: {}".format(sql))
cur.execute(sql)




def test_catalog(self):
conn_config = get_test_connection_config()
Expand All @@ -587,7 +587,7 @@ def test_catalog(self):
('properties', 'id'): {'inclusion': 'available',
'selected-by-default': True,
'sql-datatype': 'integer'}})

self.assertEqual({'definitions' : BASE_RECURSIVE_SCHEMAS,
'type': 'object',
'properties': {'id': {'type': ['null', 'integer'],
Expand Down
Loading

0 comments on commit 595813b

Please sign in to comment.