# -*- coding: utf-8 -*-
import pymysql
import struct
from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE
from pymysql.cursors import DictCursor
from pymysql.util import int2byte
from .packet import BinLogPacketWrapper
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
from .gtid import GtidSet
from .event import (
QueryEvent, RotateEvent, FormatDescriptionEvent,
XidEvent, GtidEvent, StopEvent,
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
HeartbeatLogEvent, NotImplementedEvent)
from .exceptions import BinLogNotEnabled
from .row_event import (
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
try:
from pymysql.constants.COMMAND import COM_BINLOG_DUMP_GTID
except ImportError:
# Handle old pymysql versions
# See: https://github.com/PyMySQL/PyMySQL/pull/261
COM_BINLOG_DUMP_GTID = 0x1e
# 2013 Connection Lost
# 2006 MySQL server has gone away
MYSQL_EXPECTED_ERROR_CODES = [2013, 2006]
[docs]class ReportSlave(object):
"""Represent the values that you may report when connecting as a slave
to a master. SHOW SLAVE HOSTS related"""
hostname = ''
username = ''
password = ''
port = 0
def __init__(self, value):
"""
Attributes:
value: string or tuple
if string, then it will be used hostname
if tuple it will be used as (hostname, user, password, port)
"""
if isinstance(value, (tuple, list)):
try:
self.hostname = value[0]
self.username = value[1]
self.password = value[2]
self.port = int(value[3])
except IndexError:
pass
elif isinstance(value, dict):
for key in ['hostname', 'username', 'password', 'port']:
try:
setattr(self, key, value[key])
except KeyError:
pass
else:
self.hostname = value
def __repr__(self):
return '<ReportSlave hostname=%s username=%s password=%s port=%d>' %\
(self.hostname, self.username, self.password, self.port)
[docs] def encoded(self, server_id, master_id=0):
"""
server_id: the slave server-id
master_id: usually 0. Appears as "master id" in SHOW SLAVE HOSTS
on the master. Unknown what else it impacts.
"""
# 1 [15] COM_REGISTER_SLAVE
# 4 server-id
# 1 slaves hostname length
# string[$len] slaves hostname
# 1 slaves user len
# string[$len] slaves user
# 1 slaves password len
# string[$len] slaves password
# 2 slaves mysql-port
# 4 replication rank
# 4 master-id
lhostname = len(self.hostname.encode())
lusername = len(self.username.encode())
lpassword = len(self.password.encode())
packet_len = (1 + # command
4 + # server-id
1 + # hostname length
lhostname +
1 + # username length
lusername +
1 + # password length
lpassword +
2 + # slave mysql port
4 + # replication rank
4) # master-id
MAX_STRING_LEN = 257 # one byte for length + 256 chars
return (struct.pack('<i', packet_len) +
int2byte(COM_REGISTER_SLAVE) +
struct.pack('<L', server_id) +
struct.pack('<%dp' % min(MAX_STRING_LEN, lhostname + 1),
self.hostname.encode()) +
struct.pack('<%dp' % min(MAX_STRING_LEN, lusername + 1),
self.username.encode()) +
struct.pack('<%dp' % min(MAX_STRING_LEN, lpassword + 1),
self.password.encode()) +
struct.pack('<H', self.port) +
struct.pack('<l', 0) +
struct.pack('<l', master_id))
[docs]class BinLogStreamReader(object):
"""Connect to replication stream and read event
"""
report_slave = None
def __init__(self, connection_settings, server_id, ctl_connection_settings=None, resume_stream=False,
blocking=False, only_events=None, log_file=None, log_pos=None,
filter_non_implemented_events=True,
ignored_events=None, auto_position=None,
only_tables=None, ignored_tables=None,
only_schemas=None, ignored_schemas=None,
freeze_schema=False, skip_to_timestamp=None,
report_slave=None, slave_uuid=None,
pymysql_wrapper=None,
fail_on_table_metadata_unavailable=False,
slave_heartbeat=None):
"""
Attributes:
ctl_connection_settings: Connection settings for cluster holding schema information
resume_stream: Start for event from position or the latest event of
binlog or from older available event
blocking: Read on stream is blocking
only_events: Array of allowed events
ignored_events: Array of ignored events
log_file: Set replication start log file
log_pos: Set replication start log pos (resume_stream should be true)
auto_position: Use master_auto_position gtid to set position
only_tables: An array with the tables you want to watch (only works
in binlog_format ROW)
ignored_tables: An array with the tables you want to skip
only_schemas: An array with the schemas you want to watch
ignored_schemas: An array with the schemas you want to skip
freeze_schema: If true do not support ALTER TABLE. It's faster.
skip_to_timestamp: Ignore all events until reaching specified timestamp.
report_slave: Report slave in SHOW SLAVE HOSTS.
slave_uuid: Report slave_uuid in SHOW SLAVE HOSTS.
fail_on_table_metadata_unavailable: Should raise exception if we can't get
table information on row_events
slave_heartbeat: (seconds) Should master actively send heartbeat on
connection. This also reduces traffic in GTID replication
on replication resumption (in case many event to skip in
binlog). See MASTER_HEARTBEAT_PERIOD in mysql documentation
for semantics
"""
self.__connection_settings = connection_settings
self.__connection_settings.setdefault("charset", "utf8")
self.__connected_stream = False
self.__connected_ctl = False
self.__resume_stream = resume_stream
self.__blocking = blocking
self._ctl_connection_settings = ctl_connection_settings
if ctl_connection_settings:
self._ctl_connection_settings.setdefault("charset", "utf8")
self.__only_tables = only_tables
self.__ignored_tables = ignored_tables
self.__only_schemas = only_schemas
self.__ignored_schemas = ignored_schemas
self.__freeze_schema = freeze_schema
self.__allowed_events = self._allowed_event_list(
only_events, ignored_events, filter_non_implemented_events)
self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
# We can't filter on packet level TABLE_MAP and rotate event because
# we need them for handling other operations
self.__allowed_events_in_packet = frozenset(
[TableMapEvent, RotateEvent]).union(self.__allowed_events)
self.__server_id = server_id
self.__use_checksum = False
# Store table meta information
self.table_map = {}
self.log_pos = log_pos
self.log_file = log_file
self.auto_position = auto_position
self.skip_to_timestamp = skip_to_timestamp
if report_slave:
self.report_slave = ReportSlave(report_slave)
self.slave_uuid = slave_uuid
self.slave_heartbeat = slave_heartbeat
if pymysql_wrapper:
self.pymysql_wrapper = pymysql_wrapper
else:
self.pymysql_wrapper = pymysql.connect
def close(self):
if self.__connected_stream:
self._stream_connection.close()
self.__connected_stream = False
if self.__connected_ctl:
# break reference cycle between stream reader and underlying
# mysql connection object
self._ctl_connection._get_table_information = None
self._ctl_connection.close()
self.__connected_ctl = False
def __connect_to_ctl(self):
if not self._ctl_connection_settings:
self._ctl_connection_settings = dict(self.__connection_settings)
self._ctl_connection_settings["db"] = "information_schema"
self._ctl_connection_settings["cursorclass"] = DictCursor
self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings)
self._ctl_connection._get_table_information = self.__get_table_information
self.__connected_ctl = True
def __checksum_enabled(self):
"""Return True if binlog-checksum = CRC32. Only for MySQL > 5.6"""
cur = self._stream_connection.cursor()
cur.execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'")
result = cur.fetchone()
cur.close()
if result is None:
return False
var, value = result[:2]
if value == 'NONE':
return False
return True
def _register_slave(self):
if not self.report_slave:
return
packet = self.report_slave.encoded(self.__server_id)
if pymysql.__version__ < "0.6":
self._stream_connection.wfile.write(packet)
self._stream_connection.wfile.flush()
self._stream_connection.read_packet()
else:
self._stream_connection._write_bytes(packet)
self._stream_connection._next_seq_id = 1
self._stream_connection._read_packet()
def __connect_to_stream(self):
# log_pos (4) -- position in the binlog-file to start the stream with
# flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1)
# server_id (4) -- server id of this slave
# log_file (string.EOF) -- filename of the binlog on the master
self._stream_connection = self.pymysql_wrapper(**self.__connection_settings)
self.__use_checksum = self.__checksum_enabled()
# If checksum is enabled we need to inform the server about the that
# we support it
if self.__use_checksum:
cur = self._stream_connection.cursor()
cur.execute("set @master_binlog_checksum= @@global.binlog_checksum")
cur.close()
if self.slave_uuid:
cur = self._stream_connection.cursor()
cur.execute("set @slave_uuid= '%s'" % self.slave_uuid)
cur.close()
if self.slave_heartbeat:
# 4294967 is documented as the max value for heartbeats
net_timeout = float(self.__connection_settings.get('read_timeout',
4294967))
# If heartbeat is too low, the connection will disconnect before,
# this is also the behavior in mysql
heartbeat = float(min(net_timeout/2., self.slave_heartbeat))
if heartbeat > 4294967:
heartbeat = 4294967
# master_heartbeat_period is nanoseconds
heartbeat = int(heartbeat * 1000000000)
cur = self._stream_connection.cursor()
cur.execute("set @master_heartbeat_period= %d" % heartbeat)
cur.close()
self._register_slave()
if not self.auto_position:
# only when log_file and log_pos both provided, the position info is
# valid, if not, get the current position from master
if self.log_file is None or self.log_pos is None:
cur = self._stream_connection.cursor()
cur.execute("SHOW MASTER STATUS")
master_status = cur.fetchone()
if master_status is None:
raise BinLogNotEnabled()
self.log_file, self.log_pos = master_status[:2]
cur.close()
prelude = struct.pack('<i', len(self.log_file) + 11) \
+ int2byte(COM_BINLOG_DUMP)
if self.__resume_stream:
prelude += struct.pack('<I', self.log_pos)
else:
prelude += struct.pack('<I', 4)
if self.__blocking:
prelude += struct.pack('<h', 0)
else:
prelude += struct.pack('<h', 1)
prelude += struct.pack('<I', self.__server_id)
prelude += self.log_file.encode()
else:
# Format for mysql packet master_auto_position
#
# All fields are little endian
# All fields are unsigned
# Packet length uint 4bytes
# Packet type byte 1byte == 0x1e
# Binlog flags ushort 2bytes == 0 (for retrocompatibilty)
# Server id uint 4bytes
# binlognamesize uint 4bytes
# binlogname str Nbytes N = binlognamesize
# Zeroified
# binlog position uint 4bytes == 4
# payload_size uint 4bytes
# What come next, is the payload, where the slave gtid_executed
# is sent to the master
# n_sid ulong 8bytes == which size is the gtid_set
# | sid uuid 16bytes UUID as a binary
# | n_intervals ulong 8bytes == how many intervals are sent for this gtid
# | | start ulong 8bytes Start position of this interval
# | | stop ulong 8bytes Stop position of this interval
# A gtid set looks like:
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10,
# 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140
#
# In this particular gtid set, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10
# is the first member of the set, it is called a gtid.
# In this gtid, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457 is the sid
# and have two intervals, 1-3 and 8-10, 1 is the start position of the first interval
# 3 is the stop position of the first interval.
gtid_set = GtidSet(self.auto_position)
encoded_data_size = gtid_set.encoded_length
header_size = (2 + # binlog_flags
4 + # server_id
4 + # binlog_name_info_size
4 + # empty binlog name
8 + # binlog_pos_info_size
4) # encoded_data_size
prelude = b'' + struct.pack('<i', header_size + encoded_data_size) \
+ int2byte(COM_BINLOG_DUMP_GTID)
# binlog_flags = 0 (2 bytes)
prelude += struct.pack('<H', 0)
# server_id (4 bytes)
prelude += struct.pack('<I', self.__server_id)
# binlog_name_info_size (4 bytes)
prelude += struct.pack('<I', 3)
# empty_binlog_name (4 bytes)
prelude += b'\0\0\0'
# binlog_pos_info (8 bytes)
prelude += struct.pack('<Q', 4)
# encoded_data_size (4 bytes)
prelude += struct.pack('<I', gtid_set.encoded_length)
# encoded_data
prelude += gtid_set.encoded()
if pymysql.__version__ < "0.6":
self._stream_connection.wfile.write(prelude)
self._stream_connection.wfile.flush()
else:
self._stream_connection._write_bytes(prelude)
self._stream_connection._next_seq_id = 1
self.__connected_stream = True
def fetchone(self):
while True:
if not self.__connected_stream:
self.__connect_to_stream()
if not self.__connected_ctl:
self.__connect_to_ctl()
try:
if pymysql.__version__ < "0.6":
pkt = self._stream_connection.read_packet()
else:
pkt = self._stream_connection._read_packet()
except pymysql.OperationalError as error:
code, message = error.args
if code in MYSQL_EXPECTED_ERROR_CODES:
self._stream_connection.close()
self.__connected_stream = False
continue
raise
if pkt.is_eof_packet():
self.close()
return None
if not pkt.is_ok_packet():
continue
binlog_event = BinLogPacketWrapper(pkt, self.table_map,
self._ctl_connection,
self.__use_checksum,
self.__allowed_events_in_packet,
self.__only_tables,
self.__ignored_tables,
self.__only_schemas,
self.__ignored_schemas,
self.__freeze_schema,
self.__fail_on_table_metadata_unavailable)
if binlog_event.event_type == ROTATE_EVENT:
self.log_pos = binlog_event.event.position
self.log_file = binlog_event.event.next_binlog
# Table Id in binlog are NOT persistent in MySQL - they are in-memory identifiers
# that means that when MySQL master restarts, it will reuse same table id for different tables
# which will cause errors for us since our in-memory map will try to decode row data with
# wrong table schema.
# The fix is to rely on the fact that MySQL will also rotate to a new binlog file every time it
# restarts. That means every rotation we see *could* be a sign of restart and so potentially
# invalidates all our cached table id to schema mappings. This means we have to load them all
# again for each logfile which is potentially wasted effort but we can't really do much better
# without being broken in restart case
self.table_map = {}
elif binlog_event.log_pos:
self.log_pos = binlog_event.log_pos
# This check must not occur before clearing the ``table_map`` as a
# result of a RotateEvent.
#
# The first RotateEvent in a binlog file has a timestamp of
# zero. If the server has moved to a new log and not written a
# timestamped RotateEvent at the end of the previous log, the
# RotateEvent at the beginning of the new log will be ignored
# if the caller provided a positive ``skip_to_timestamp``
# value. This will result in the ``table_map`` becoming
# corrupt.
#
# https://dev.mysql.com/doc/internals/en/event-data-for-specific-event-types.html
# From the MySQL Internals Manual:
#
# ROTATE_EVENT is generated locally and written to the binary
# log on the master. It is written to the relay log on the
# slave when FLUSH LOGS occurs, and when receiving a
# ROTATE_EVENT from the master. In the latter case, there
# will be two rotate events in total originating on different
# servers.
#
# There are conditions under which the terminating
# log-rotation event does not occur. For example, the server
# might crash.
if self.skip_to_timestamp and binlog_event.timestamp < self.skip_to_timestamp:
continue
if binlog_event.event_type == TABLE_MAP_EVENT and \
binlog_event.event is not None:
self.table_map[binlog_event.event.table_id] = \
binlog_event.event.get_table()
# event is none if we have filter it on packet level
# we filter also not allowed events
if binlog_event.event is None or (binlog_event.event.__class__ not in self.__allowed_events):
continue
return binlog_event.event
def _allowed_event_list(self, only_events, ignored_events,
filter_non_implemented_events):
if only_events is not None:
events = set(only_events)
else:
events = set((
QueryEvent,
RotateEvent,
StopEvent,
FormatDescriptionEvent,
XidEvent,
GtidEvent,
BeginLoadQueryEvent,
ExecuteLoadQueryEvent,
UpdateRowsEvent,
WriteRowsEvent,
DeleteRowsEvent,
TableMapEvent,
HeartbeatLogEvent,
NotImplementedEvent,
))
if ignored_events is not None:
for e in ignored_events:
events.remove(e)
if filter_non_implemented_events:
try:
events.remove(NotImplementedEvent)
except KeyError:
pass
return frozenset(events)
def __get_table_information(self, schema, table):
for i in range(1, 3):
try:
if not self.__connected_ctl:
self.__connect_to_ctl()
cur = self._ctl_connection.cursor()
cur.execute("""
SELECT
COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY
FROM
information_schema.columns
WHERE
table_schema = %s AND table_name = %s
ORDER BY ORDINAL_POSITION
""", (schema, table))
return cur.fetchall()
except pymysql.OperationalError as error:
code, message = error.args
if code in MYSQL_EXPECTED_ERROR_CODES:
self.__connected_ctl = False
continue
else:
raise error
def __iter__(self):
return iter(self.fetchone, None)