diff --git a/README.md b/README.md index 2c84bad9..7910eebe 100644 --- a/README.md +++ b/README.md @@ -366,6 +366,7 @@ Other contributors: - Oliver Seemann: Handle large json, github actions, Zero-pad fixed-length binary fields (https://github.com/oseemann) - Mahadir Ahmad: Handle null json payload (https://github.com/mahadirz) +- Mehmet Kartalbas: Add MySQL 5.7 column name support (https://github.com/kartalbas) - Axel Viala: Removal of Python 2.7 (https://github.com/darnuria) - Etern: Add XAPrepareEvent, parse last_committed & sequence_number of GtidEvent (https://github.com/etern) - Jason Fulghum: typo in ident variable name (https://github.com/fulghum) diff --git a/docs/index.rst b/docs/index.rst index c30f00bb..9cf57ed2 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -26,6 +26,7 @@ Contents installation limitations + mysql57_support binlogstream events examples diff --git a/docs/mysql57_support.rst b/docs/mysql57_support.rst new file mode 100644 index 00000000..85a6772b --- /dev/null +++ b/docs/mysql57_support.rst @@ -0,0 +1,69 @@ +.. _mysql57_support: + +MySQL 5.7, MySQL 8.0+ and `use_column_name_cache` +================================================== + +In MySQL 5.7 and earlier, the binary log events for row-based replication do not include column name metadata. This means that `python-mysql-replication` cannot map column values to their names directly from the binlog event. + +Starting with MySQL 8.0.1, the `binlog_row_metadata` system variable was introduced to control the amount of metadata written to the binary log. This is a **GLOBAL** and **DYNAMIC** variable. The default value for this variable is `MINIMAL`, which provides the same behavior as MySQL 5.7. + +The Problem +----------- + +When column metadata is not present in the binlog (as in MySQL 5.7 and earlier, or when `binlog_row_metadata` is set to `MINIMAL` globally in MySQL 8.0+), the `values` dictionary in a `WriteRowsEvent`, `UpdateRowsEvent`, or `DeleteRowsEvent` will contain integer keys corresponding to the column index, not the column names. + +For example, for a table `users` with columns `id` and `name`, an insert event might look like this: + +.. code-block:: python + + {0: 1, 1: 'John Doe'} + +This can make your replication logic harder to write and maintain, as you need to know the column order. + +The Solution: `use_column_name_cache` +------------------------------------- + +To address this, `python-mysql-replication` provides the `use_column_name_cache` parameter for the `BinLogStreamReader`. + +When you set `use_column_name_cache=True`, the library will perform a query to the `INFORMATION_SCHEMA.COLUMNS` table to fetch the column names for a given table the first time it encounters an event for that table. The column names are then cached in memory for subsequent events for the same table, avoiding redundant queries. + +This allows you to receive row data with column names as keys. + +MySQL 8.0+ with `binlog_row_metadata=FULL` +------------------------------------------ + +In MySQL 8.0.1 and later, you can set `binlog_row_metadata` to `FULL` using `SET GLOBAL binlog_row_metadata = 'FULL'`. When this setting is enabled, the column names are included directly in the binlog events, and `use_column_name_cache` is not necessary. + +Example +------- + +Here is how to enable the column name cache when needed: + +.. code-block:: python + + from pymysqlreplication import BinLogStreamReader + + mysql_settings = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'passwd': ''} + + # Enable the column name cache for MySQL 5.7 or MySQL 8.0+ with binlog_row_metadata=MINIMAL + stream = BinLogStreamReader( + connection_settings=mysql_settings, + server_id=100, + use_column_name_cache=True + ) + + for binlogevent in stream: + if isinstance(binlogevent, WriteRowsEvent): + # Now you can access values by column name + user_id = binlogevent.rows[0]["values"]["id"] + user_name = binlogevent.rows[0]["values"]["name"] + print(f"New user: id={user_id}, name={user_name}") + + stream.close() + +Important Considerations +------------------------ + +* **Performance:** Enabling `use_column_name_cache` will result in an extra query to the database for each new table encountered in the binlog. The results are cached, so the performance impact should be minimal after the initial query for each table. +* **Permissions:** The MySQL user used for replication must have `SELECT` privileges on the `INFORMATION_SCHEMA.COLUMNS` table. +* **Default Behavior:** This feature is disabled by default (`use_column_name_cache=False`) to maintain backward compatibility and to avoid making extra queries unless explicitly requested. diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 0ba4ca5c..91947f25 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -188,6 +188,7 @@ def __init__( ignore_decode_errors=False, verify_checksum=False, enable_logging=True, + use_column_name_cache=False, ): """ Attributes: @@ -230,6 +231,8 @@ def __init__( verify_checksum: If true, verify events read from the binary log by examining checksums. enable_logging: When set to True, logs various details helpful for debugging and monitoring When set to False, logging is disabled to enhance performance. + use_column_name_cache: If true, enables caching of column names from INFORMATION_SCHEMA + for MySQL 5.7 compatibility when binlog metadata is missing. Default is False. """ self.__connection_settings = connection_settings @@ -254,6 +257,8 @@ def __init__( self.__ignore_decode_errors = ignore_decode_errors self.__verify_checksum = verify_checksum self.__optional_meta_data = False + self.__enable_logging = enable_logging + self.__use_column_name_cache = use_column_name_cache # We can't filter on packet level TABLE_MAP and rotate event because # we need them for handling other operations @@ -630,6 +635,8 @@ def fetchone(self): self.__ignore_decode_errors, self.__verify_checksum, self.__optional_meta_data, + self.__enable_logging, + self.__use_column_name_cache, ) if binlog_event.event_type == ROTATE_EVENT: diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 868866de..374704b6 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -28,6 +28,8 @@ def __init__( ignore_decode_errors=False, verify_checksum=False, optional_meta_data=False, + enable_logging=False, + use_column_name_cache=False, ): self.packet = from_packet self.table_map = table_map diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index b70628fa..aa97087b 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -74,6 +74,8 @@ def __init__( ignore_decode_errors, verify_checksum, optional_meta_data, + enable_logging, + use_column_name_cache=False, ): # -1 because we ignore the ok byte self.read_bytes = 0 @@ -82,6 +84,8 @@ def __init__( self.packet = from_packet self.charset = ctl_connection.charset + self.enable_logging = enable_logging + self.use_column_name_cache = use_column_name_cache # OK value # timestamp @@ -127,6 +131,8 @@ def __init__( ignore_decode_errors=ignore_decode_errors, verify_checksum=verify_checksum, optional_meta_data=optional_meta_data, + enable_logging=enable_logging, + use_column_name_cache=use_column_name_cache, ) if not self.event._processed: self.event = None diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 11429f74..3044ea53 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -1,6 +1,7 @@ import struct import decimal import datetime +import logging from pymysql.charset import charset_by_name from enum import Enum @@ -15,6 +16,10 @@ from .bitmap import BitCount, BitGet + +# MySQL 5.7 compatibility: Cache for INFORMATION_SCHEMA column names +_COLUMN_NAME_CACHE = {} + class RowsEvent(BinLogEvent): def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) @@ -746,6 +751,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.__ignored_schemas = kwargs["ignored_schemas"] self.__freeze_schema = kwargs["freeze_schema"] self.__optional_meta_data = kwargs["optional_meta_data"] + self.__enable_logging = kwargs.get("enable_logging", False) + self.__use_column_name_cache = kwargs.get("use_column_name_cache", False) # Post-Header self.table_id = self._read_table_id() @@ -909,12 +916,70 @@ def _get_optional_meta_data(self): return optional_metadata + + def _fetch_column_names_from_schema(self): + """ + Fetch column names from INFORMATION_SCHEMA for MySQL 5.7 compatibility. + + Only executes if use_column_name_cache=True is enabled. + Uses module-level cache to avoid repeated queries. + + Returns: + list: Column names in ORDINAL_POSITION order, or empty list + """ + # Only fetch if explicitly enabled (opt-in feature) + if not self.__use_column_name_cache: + return [] + + cache_key = f"{self.schema}.{self.table}" + + # Check cache first + if cache_key in _COLUMN_NAME_CACHE: + return _COLUMN_NAME_CACHE[cache_key] + + try: + query = """ + SELECT COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s + ORDER BY ORDINAL_POSITION + """ + cursor = self._ctl_connection.cursor() + cursor.execute(query, (self.schema, self.table)) + rows = cursor.fetchall() + # Handle both tuple and dict cursor results + if rows and isinstance(rows[0], dict): + column_names = [row['COLUMN_NAME'] for row in rows] + else: + column_names = [row[0] for row in rows] + cursor.close() + + # Cache result + _COLUMN_NAME_CACHE[cache_key] = column_names + + if self.__enable_logging and column_names: + logging.info(f"Cached column names for {cache_key}: {len(column_names)} columns") + + return column_names + except Exception as e: + if self.__enable_logging: + logging.warning(f"Failed to fetch column names for {cache_key}: {type(e).__name__}: {e}") + # Cache empty result to avoid retry spam + _COLUMN_NAME_CACHE[cache_key] = [] + return [] + def _sync_column_info(self): if not self.__optional_meta_data: - # If optional_meta_data is False Do not sync Event Time Column Schemas + column_names = self._fetch_column_names_from_schema() + if column_names and len(column_names) == self.column_count: + for column_idx in range(self.column_count): + self.columns[column_idx].name = column_names[column_idx] return if len(self.optional_metadata.column_name_list) == 0: - # May Be Now BINLOG_ROW_METADATA = FULL But Before Action BINLOG_ROW_METADATA Mode = MINIMAL + column_names = self._fetch_column_names_from_schema() + if column_names and len(column_names) == self.column_count: + for column_idx in range(self.column_count): + self.columns[column_idx].name = column_names[column_idx] return charset_pos = 0 enum_or_set_pos = 0 diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index b2d61659..97d3c3be 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -1,11 +1,12 @@ -import pymysql import copy -from pymysqlreplication import BinLogStreamReader -import os import json +import os +import unittest + +import pymysql import pytest -import unittest +from pymysqlreplication import BinLogStreamReader def get_databases(): @@ -89,6 +90,12 @@ def isMySQL57(self): version = float(self.getMySQLVersion().rsplit(".", 1)[0]) return version == 5.7 + def isMySQL57AndMore(self): + if self.isMariaDB(): + return False + version = float(self.getMySQLVersion().rsplit(".", 1)[0]) + return version >= 5.7 + def isMySQL80AndMore(self): if self.isMariaDB(): return False diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index fc3b635a..34067728 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1,19 +1,19 @@ import io import time import unittest +from unittest.mock import patch + +from pymysql.protocol import MysqlPacket -from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation -from pymysqlreplication.tests import base from pymysqlreplication import BinLogStreamReader -from pymysqlreplication.gtid import GtidSet, Gtid -from pymysqlreplication.event import * from pymysqlreplication.constants.BINLOG import * from pymysqlreplication.constants.NONE_SOURCE import * -from pymysqlreplication.row_event import * +from pymysqlreplication.event import * +from pymysqlreplication.gtid import Gtid, GtidSet +from pymysqlreplication.json_binary import JsonDiff, JsonDiffOperation from pymysqlreplication.packet import BinLogPacketWrapper -from pymysql.protocol import MysqlPacket -from unittest.mock import patch - +from pymysqlreplication.row_event import * +from pymysqlreplication.tests import base __all__ = [ "TestBasicBinLogStreamReader", @@ -271,6 +271,87 @@ def test_write_row_event(self): self.assertEqual(event.rows[0]["values"]["data"], "Hello World") self.assertEqual(event.columns[1].name, "data") + def test_fetch_column_names_from_schema(self): + # This test is for scenarios where column names are NOT in the binlog + # (MySQL 5.7 or older, or MySQL 8.0+ with binlog_row_metadata=MINIMAL) + + # Check if binlog_row_metadata exists (MySQL 8.0+) + try: + cursor = self.execute("SHOW GLOBAL VARIABLES LIKE 'binlog_row_metadata'") + result = cursor.fetchone() + if result: + global_binlog_row_metadata = result[1] + if global_binlog_row_metadata == 'FULL': + self.skipTest("binlog_row_metadata is FULL globally, use_column_name_cache is not needed") + # If result is None, binlog_row_metadata doesn't exist (MySQL 5.7 or older), so proceed + except pymysql.err.OperationalError as e: + if e.args[0] == 1193: # ER_UNKNOWN_SYSTEM_VARIABLE + # Variable doesn't exist, likely MySQL 5.7 or older, so proceed + pass + else: + raise + + query = "CREATE TABLE test_column_cache (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + self.execute("INSERT INTO test_column_cache (data) VALUES('Hello')") + self.execute("COMMIT") + + # Test with use_column_name_cache = True + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + use_column_name_cache=True, + only_events=[WriteRowsEvent], + ) + + event = self.stream.fetchone() + self.assertIsInstance(event, WriteRowsEvent) + self.assertEqual(event.table, "test_column_cache") + self.assertIn("id", event.rows[0]["values"]) + self.assertIn("data", event.rows[0]["values"]) + self.assertEqual(event.rows[0]["values"]["id"], 1) + self.assertEqual(event.rows[0]["values"]["data"], "Hello") + + # Test with use_column_name_cache = False + self.stream.close() + + # Clear cache before next run + from pymysqlreplication import row_event + row_event._COLUMN_NAME_CACHE.clear() + + self.stream = BinLogStreamReader( + self.database, + server_id=1025, # different server_id to avoid caching issues + use_column_name_cache=False, + only_events=[WriteRowsEvent], + ) + + # Reset and replay events + self.resetBinLog() + self.execute("INSERT INTO test_column_cache (data) VALUES('World')") + self.execute("COMMIT") + + # Skip RotateEvent and FormatDescriptionEvent + self.stream.fetchone() + self.stream.fetchone() + # Skip QueryEvent for BEGIN + if not self.isMariaDB(): + self.stream.fetchone() + # Skip TableMapEvent + self.stream.fetchone() + + event = self.stream.fetchone() + self.assertIsInstance(event, WriteRowsEvent) + self.assertEqual(event.table, "test_column_cache") + # With cache disabled, we should not have column names + self.assertNotIn("id", event.rows[0]["values"]) + self.assertNotIn("data", event.rows[0]["values"]) + + # cleanup + row_event._COLUMN_NAME_CACHE.clear() + + def test_delete_row_event(self): query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) @@ -619,6 +700,8 @@ def create_binlog_packet_wrapper(pkt): self.stream._BinLogStreamReader__ignore_decode_errors, self.stream._BinLogStreamReader__verify_checksum, self.stream._BinLogStreamReader__optional_meta_data, + self.stream._BinLogStreamReader__enable_logging, + self.stream._BinLogStreamReader__use_column_name_cache, ) self.stream.close()