# -*- coding: utf-8 -*-
import binascii
import struct
import datetime
from pymysql.util import byte2int, int2byte
class BinLogEvent(object):
def __init__(self, from_packet, event_size, table_map, ctl_connection,
only_tables=None,
ignored_tables=None,
only_schemas=None,
ignored_schemas=None,
freeze_schema=False,
fail_on_table_metadata_unavailable=False):
self.packet = from_packet
self.table_map = table_map
self.event_type = self.packet.event_type
self.timestamp = self.packet.timestamp
self.event_size = event_size
self._ctl_connection = ctl_connection
self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
# The event have been fully processed, if processed is false
# the event will be skipped
self._processed = True
self.complete = True
def _read_table_id(self):
# Table ID is 6 byte
# pad little-endian number
table_id = self.packet.read(6) + int2byte(0) + int2byte(0)
return struct.unpack('<Q', table_id)[0]
def dump(self):
print("=== %s ===" % (self.__class__.__name__))
print("Date: %s" % (datetime.datetime.fromtimestamp(self.timestamp)
.isoformat()))
print("Log position: %d" % self.packet.log_pos)
print("Event size: %d" % (self.event_size))
print("Read bytes: %d" % (self.packet.read_bytes))
self._dump()
print()
def _dump(self):
"""Core data dumped for the event"""
pass
[docs]class GtidEvent(BinLogEvent):
"""GTID change in binlog event
"""
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(GtidEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
self.commit_flag = byte2int(self.packet.read(1)) == 1
self.sid = self.packet.read(16)
self.gno = struct.unpack('<Q', self.packet.read(8))[0]
@property
def gtid(self):
"""GTID = source_id:transaction_id
Eg: 3E11FA47-71CA-11E1-9E33-C80AA9429562:23
See: http://dev.mysql.com/doc/refman/5.6/en/replication-gtids-concepts.html"""
nibbles = binascii.hexlify(self.sid).decode('ascii')
gtid = '%s-%s-%s-%s-%s:%d' % (
nibbles[:8], nibbles[8:12], nibbles[12:16], nibbles[16:20], nibbles[20:], self.gno
)
return gtid
def _dump(self):
print("Commit: %s" % self.commit_flag)
print("GTID_NEXT: %s" % self.gtid)
def __repr__(self):
return '<GtidEvent "%s">' % self.gtid
[docs]class RotateEvent(BinLogEvent):
"""Change MySQL bin log file
Attributes:
position: Position inside next binlog
next_binlog: Name of next binlog file
"""
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(RotateEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
self.position = struct.unpack('<Q', self.packet.read(8))[0]
self.next_binlog = self.packet.read(event_size - 8).decode()
def dump(self):
print("=== %s ===" % (self.__class__.__name__))
print("Position: %d" % self.position)
print("Next binlog file: %s" % self.next_binlog)
print()
class FormatDescriptionEvent(BinLogEvent):
pass
class StopEvent(BinLogEvent):
pass
[docs]class XidEvent(BinLogEvent):
"""A COMMIT event
Attributes:
xid: Transaction ID for 2PC
"""
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(XidEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
self.xid = struct.unpack('<Q', self.packet.read(8))[0]
def _dump(self):
super(XidEvent, self)._dump()
print("Transaction ID: %d" % (self.xid))
[docs]class HeartbeatLogEvent(BinLogEvent):
"""A Heartbeat event
Heartbeats are sent by the master only if there are no unsent events in the
binary log file for a period longer than the interval defined by
MASTER_HEARTBEAT_PERIOD connection setting.
A mysql server will also play those to the slave for each skipped
events in the log. I (baloo) believe the intention is to make the slave
bump its position so that if a disconnection occurs, the slave only
reconnects from the last skipped position (see Binlog_sender::send_events
in sql/rpl_binlog_sender.cc). That makes 106 bytes of data for skipped
event in the binlog. *this is also the case with GTID replication*. To
mitigate such behavior, you are expected to keep the binlog small (see
max_binlog_size, defaults to 1G).
In any case, the timestamp is 0 (as in 1970-01-01T00:00:00).
Attributes:
ident: Name of the current binlog
"""
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(HeartbeatLogEvent, self).__init__(from_packet, event_size,
table_map, ctl_connection,
**kwargs)
self.ident = self.packet.read(event_size).decode()
def _dump(self):
super(HeartbeatLogEvent, self)._dump()
print("Current binlog: %s" % (self.ident))
[docs]class QueryEvent(BinLogEvent):
'''This evenement is trigger when a query is run of the database.
Only replicated queries are logged.'''
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(QueryEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
# Post-header
self.slave_proxy_id = self.packet.read_uint32()
self.execution_time = self.packet.read_uint32()
self.schema_length = byte2int(self.packet.read(1))
self.error_code = self.packet.read_uint16()
self.status_vars_length = self.packet.read_uint16()
# Payload
self.status_vars = self.packet.read(self.status_vars_length)
self.schema = self.packet.read(self.schema_length)
self.packet.advance(1)
self.query = self.packet.read(event_size - 13 - self.status_vars_length
- self.schema_length - 1).decode("utf-8")
#string[EOF] query
def _dump(self):
super(QueryEvent, self)._dump()
print("Schema: %s" % (self.schema))
print("Execution time: %d" % (self.execution_time))
print("Query: %s" % (self.query))
[docs]class BeginLoadQueryEvent(BinLogEvent):
"""
Attributes:
file_id
block-data
"""
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(BeginLoadQueryEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
# Payload
self.file_id = self.packet.read_uint32()
self.block_data = self.packet.read(event_size - 4)
def _dump(self):
super(BeginLoadQueryEvent, self)._dump()
print("File id: %d" % (self.file_id))
print("Block data: %s" % (self.block_data))
[docs]class ExecuteLoadQueryEvent(BinLogEvent):
"""
Attributes:
slave_proxy_id
execution_time
schema_length
error_code
status_vars_length
file_id
start_pos
end_pos
dup_handling_flags
"""
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(ExecuteLoadQueryEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
# Post-header
self.slave_proxy_id = self.packet.read_uint32()
self.execution_time = self.packet.read_uint32()
self.schema_length = self.packet.read_uint8()
self.error_code = self.packet.read_uint16()
self.status_vars_length = self.packet.read_uint16()
# Payload
self.file_id = self.packet.read_uint32()
self.start_pos = self.packet.read_uint32()
self.end_pos = self.packet.read_uint32()
self.dup_handling_flags = self.packet.read_uint8()
def _dump(self):
super(ExecuteLoadQueryEvent, self)._dump()
print("Slave proxy id: %d" % (self.slave_proxy_id))
print("Execution time: %d" % (self.execution_time))
print("Schema length: %d" % (self.schema_length))
print("Error code: %d" % (self.error_code))
print("Status vars length: %d" % (self.status_vars_length))
print("File id: %d" % (self.file_id))
print("Start pos: %d" % (self.start_pos))
print("End pos: %d" % (self.end_pos))
print("Dup handling flags: %d" % (self.dup_handling_flags))
[docs]class IntvarEvent(BinLogEvent):
"""
Attributes:
type
value
"""
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(IntvarEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
# Payload
self.type = self.packet.read_uint8()
self.value = self.packet.read_uint32()
def _dump(self):
super(IntvarEvent, self)._dump()
print("type: %d" % (self.type))
print("Value: %d" % (self.value))
class NotImplementedEvent(BinLogEvent):
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(NotImplementedEvent, self).__init__(
from_packet, event_size, table_map, ctl_connection, **kwargs)
self.packet.advance(event_size)