Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion doc/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ Add this flag when executing on a 1st generation Google Cloud Platform (GCP).

### gtid

Add this flag to enable support for [MySQL replication GTIDs](https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-concepts.html) for replication positioning. This requires `gtid_mode` and `enforce_gtid_consistency` to be set to `ON`.
Add this flag to enable support for [MySQL replication GTIDs](https://dev.mysql.com/doc/refman/5.7/en/replication-gtids-concepts.html) for replication positioning. On MySQL this requires `gtid_mode` and `enforce_gtid_consistency` to be set to `ON`.

[MariaDB GTIDs](https://mariadb.com/kb/en/gtid/) are also supported: gh-ost detects the server flavor automatically and uses the appropriate GTID dialect. MariaDB has no `gtid_mode`/`enforce_gtid_consistency` settings — GTIDs are always recorded when binary logging is enabled, so no extra configuration is required beyond `--gtid`.

### heartbeat-interval-millis

Expand Down
28 changes: 20 additions & 8 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
uuid "github.com/google/uuid"
)

type RowsEventFilterFunc func(databaseName, tableName string) bool
Expand Down Expand Up @@ -60,7 +59,7 @@ func NewGoMySQLReader(migrationContext *base.MigrationContext, rowsEventFilters
}
config := replication.BinlogSyncerConfig{
ServerID: uint32(migrationContext.ReplicaServerId),
Flavor: gomysql.MySQLFlavor,
Flavor: mysql.FlavorFor(migrationContext.InspectorMySQLVersion),
Host: connectionConfig.Key.Hostname,
Port: uint16(connectionConfig.Key.Port),
User: connectionConfig.User,
Expand Down Expand Up @@ -177,11 +176,19 @@ func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChan
}

switch event := ev.Event.(type) {
case *replication.GTIDEvent:
case *replication.GTIDEvent, *replication.MariadbGTIDEvent:
// MySQL emits *GTIDEvent, MariaDB emits *MariadbGTIDEvent; both
// implement BinlogGTIDEvent.GTIDNext() returning the GTID about to
// be applied. We advance currentCoordinates by merging it into the
// running GTID set, regardless of flavor.
if !gmr.migrationContext.UseGTIDs {
continue
}
sid, err := uuid.FromBytes(event.SID)
gtidEvent, ok := ev.Event.(gomysql.BinlogGTIDEvent)
if !ok {
return fmt.Errorf("unexpected GTID event type: %T", ev.Event)
}
nextGTID, err := gtidEvent.GTIDNext()
if err != nil {
return err
}
Expand All @@ -191,10 +198,11 @@ func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChan
}
coords := gmr.currentCoordinates.(*mysql.GTIDBinlogCoordinates)
if coords.GTIDSet == nil {
gtidSet := gomysql.NewMysqlGTIDSet()
coords.GTIDSet = &gtidSet
coords.GTIDSet = nextGTID
} else if err := coords.GTIDSet.Update(nextGTID.String()); err != nil {
gmr.currentCoordinatesMutex.Unlock()
return err
}
coords.GTIDSet.AddGTID(sid, event.GNO)
gmr.currentCoordinatesMutex.Unlock()
case *replication.RotateEvent:
if gmr.migrationContext.UseGTIDs {
Expand All @@ -207,7 +215,11 @@ func (gmr *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChan
gmr.currentCoordinatesMutex.Unlock()
case *replication.XIDEvent:
if gmr.migrationContext.UseGTIDs {
gmr.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet.(*gomysql.MysqlGTIDSet)}
// event.GSet is the full executed GTID set maintained by the
// syncer (MysqlGTIDSet or MariadbGTIDSet depending on flavor).
if event.GSet != nil {
gmr.LastTrxCoords = &mysql.GTIDBinlogCoordinates{GTIDSet: event.GSet}
}
} else {
gmr.LastTrxCoords = gmr.currentCoordinates.Clone()
}
Expand Down
2 changes: 1 addition & 1 deletion go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
}
chk.Timestamp = time.Unix(timestamp, 0)
if apl.migrationContext.UseGTIDs {
gtidCoords, err := mysql.NewGTIDBinlogCoordinates(coordStr)
gtidCoords, err := mysql.NewGTIDBinlogCoordinates(mysql.FlavorFor(apl.migrationContext.ApplierMySQLVersion), coordStr)
if err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,13 @@ func (isp *Inspector) validateBinlogs() error {

// validateGTIDConfig checks that the GTID configuration is good to go
func (isp *Inspector) validateGTIDConfig() error {
if mysql.IsMariaDB(isp.dbVersion) {
// MariaDB has no @@gtid_mode / @@enforce_gtid_consistency: GTIDs are
// always recorded in the binary log when binary logging is enabled
// (which is validated separately). Nothing else to check.
isp.migrationContext.Log.Infof("MariaDB GTID config validated on %s", isp.connectionConfig.Key.String())
return nil
}
var gtidMode, enforceGtidConsistency string
query := `select @@global.gtid_mode, @@global.enforce_gtid_consistency`
if err := isp.db.QueryRow(query).Scan(&gtidMode, &enforceGtidConsistency); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestMigratorHeartbeatDoesNotAdvancePastUnappliedDML(t *testing.T) {

// A DML on the original table at GTID :100 is observed and enqueued, but
// not yet applied.
dmlCoords, err := mysql.NewGTIDBinlogCoordinates(srcUUID + ":1-100")
dmlCoords, err := mysql.NewGTIDBinlogCoordinates(mysql.MySQLFlavor, srcUUID+":1-100")
require.NoError(t, err)
migrator.applyEventsQueue <- newApplyEventStructByDML(&binlog.BinlogEntry{
DmlEvent: &binlog.BinlogDMLEvent{
Expand All @@ -224,7 +224,7 @@ func TestMigratorHeartbeatDoesNotAdvancePastUnappliedDML(t *testing.T) {

// A heartbeat row is then written; its GTID set includes the un-applied
// DML plus a few additional transactions.
heartbeatCoords, err := mysql.NewGTIDBinlogCoordinates(srcUUID + ":1-105")
heartbeatCoords, err := mysql.NewGTIDBinlogCoordinates(mysql.MySQLFlavor, srcUUID+":1-105")
require.NoError(t, err)
heartbeatColumnValues := sql.ToColumnValues([]interface{}{
123,
Expand Down
27 changes: 24 additions & 3 deletions go/logic/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"

gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/openark/golib/sqlutils"
)

Expand Down Expand Up @@ -162,17 +161,22 @@ func (es *EventsStreamer) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates

// readCurrentBinlogCoordinates reads master status from hooked server
func (es *EventsStreamer) readCurrentBinlogCoordinates() error {
// MariaDB exposes no GTID column in SHOW MASTER STATUS; its current binlog
// GTID position lives in @@global.gtid_binlog_pos.
if es.migrationContext.UseGTIDs && mysql.IsMariaDB(es.dbVersion) {
return es.readCurrentMariaDBGTIDCoordinates()
}
binaryLogStatusTerm := mysql.ReplicaTermFor(es.dbVersion, "master status")
query := fmt.Sprintf("show /* gh-ost readCurrentBinlogCoordinates */ %s", binaryLogStatusTerm)
foundMasterStatus := false
err := sqlutils.QueryRowsMap(es.db, query, func(m sqlutils.RowMap) error {
if es.migrationContext.UseGTIDs {
execGtidSet := m.GetString("Executed_Gtid_Set")
gtidSet, err := gomysql.ParseMysqlGTIDSet(execGtidSet)
coords, err := mysql.NewGTIDBinlogCoordinates(mysql.MySQLFlavor, execGtidSet)
if err != nil {
return err
}
es.initialBinlogCoordinates = &mysql.GTIDBinlogCoordinates{GTIDSet: gtidSet.(*gomysql.MysqlGTIDSet)}
es.initialBinlogCoordinates = coords
} else {
es.initialBinlogCoordinates = &mysql.FileBinlogCoordinates{
LogFile: m.GetString("File"),
Expand All @@ -192,6 +196,23 @@ func (es *EventsStreamer) readCurrentBinlogCoordinates() error {
return nil
}

// readCurrentMariaDBGTIDCoordinates reads the current binlog GTID position from
// a MariaDB server, which is exposed via @@global.gtid_binlog_pos rather than a
// column in SHOW MASTER STATUS.
func (es *EventsStreamer) readCurrentMariaDBGTIDCoordinates() error {
var gtidBinlogPos string
if err := es.db.QueryRow(`select @@global.gtid_binlog_pos`).Scan(&gtidBinlogPos); err != nil {
return err
}
coords, err := mysql.NewGTIDBinlogCoordinates(mysql.MariaDBFlavor, gtidBinlogPos)
if err != nil {
return err
}
es.initialBinlogCoordinates = coords
es.migrationContext.Log.Debugf("Streamer binlog coordinates: %+v", es.initialBinlogCoordinates)
return nil
}

// StreamEvents will begin streaming events. It will be blocking, so should be
// executed by a goroutine
func (es *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
Expand Down
38 changes: 32 additions & 6 deletions go/mysql/binlog_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ func TestBinlogCoordinates(t *testing.T) {
48e2bc1d-d66d-11e8-bf56-a0369f9437b8:1,
492e2980-4518-11e9-92c6-e4434b3eca94:1-4926754399`)

c5 := GTIDBinlogCoordinates{GTIDSet: gtidSet1.(*gomysql.MysqlGTIDSet)}
c6 := GTIDBinlogCoordinates{GTIDSet: gtidSet1.(*gomysql.MysqlGTIDSet)}
c7 := GTIDBinlogCoordinates{GTIDSet: gtidSet2.(*gomysql.MysqlGTIDSet)}
c8 := GTIDBinlogCoordinates{GTIDSet: gtidSet3.(*gomysql.MysqlGTIDSet)}
c9 := GTIDBinlogCoordinates{GTIDSet: gtidSetBig1.(*gomysql.MysqlGTIDSet)}
c10 := GTIDBinlogCoordinates{GTIDSet: gtidSetBig2.(*gomysql.MysqlGTIDSet)}
c5 := GTIDBinlogCoordinates{GTIDSet: gtidSet1}
c6 := GTIDBinlogCoordinates{GTIDSet: gtidSet1}
c7 := GTIDBinlogCoordinates{GTIDSet: gtidSet2}
c8 := GTIDBinlogCoordinates{GTIDSet: gtidSet3}
c9 := GTIDBinlogCoordinates{GTIDSet: gtidSetBig1}
c10 := GTIDBinlogCoordinates{GTIDSet: gtidSetBig2}

require.True(t, c5.Equals(&c6))
require.True(t, c1.Equals(&c2))
Expand All @@ -76,6 +76,32 @@ func TestBinlogCoordinates(t *testing.T) {
require.True(t, c9.SmallerThanOrEquals(&c10))
}

func TestMariaDBGTIDBinlogCoordinates(t *testing.T) {
// MariaDB GTID sets use domain-server-sequence format.
c1, err := NewGTIDBinlogCoordinates(MariaDBFlavor, "0-1-100")
require.NoError(t, err)
c2, err := NewGTIDBinlogCoordinates(MariaDBFlavor, "0-1-100")
require.NoError(t, err)
c3, err := NewGTIDBinlogCoordinates(MariaDBFlavor, "0-1-150")
require.NoError(t, err)

require.True(t, c1.Equals(c2))
require.False(t, c1.Equals(c3))
require.True(t, c1.SmallerThan(c3))
require.False(t, c3.SmallerThan(c1))
require.True(t, c1.SmallerThanOrEquals(c3))

clone := c1.Clone()
require.True(t, c1.Equals(clone))
require.False(t, c1.IsEmpty())
}

func TestFlavorFor(t *testing.T) {
require.Equal(t, MariaDBFlavor, FlavorFor("10.6.18-MariaDB-log"))
require.Equal(t, MySQLFlavor, FlavorFor("8.0.36"))
require.Equal(t, MySQLFlavor, FlavorFor("8.4.0"))
}

func TestBinlogCoordinatesAsKey(t *testing.T) {
m := make(map[BinlogCoordinates]bool)

Expand Down
43 changes: 34 additions & 9 deletions go/mysql/binlog_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,39 @@ import (
gomysql "github.com/go-mysql-org/go-mysql/mysql"
)

// GTIDBinlogCoordinates describe binary log coordinates in MySQL GTID format.
// Re-exported go-mysql flavor identifiers so the rest of gh-ost doesn't have to
// import go-mysql directly to talk about flavors.
const (
MySQLFlavor = gomysql.MySQLFlavor
MariaDBFlavor = gomysql.MariaDBFlavor
)

// FlavorFor returns the go-mysql flavor identifier for the given server version
// string. It is used to parse GTID sets and to configure the binlog syncer in
// the correct (MySQL vs MariaDB) GTID dialect.
func FlavorFor(mysqlVersion string) string {
if IsMariaDB(mysqlVersion) {
return MariaDBFlavor
}
return MySQLFlavor
}

// GTIDBinlogCoordinates describe binary log coordinates as a GTID set. The
// underlying set is either a MySQL or a MariaDB GTID set depending on the
// flavor it was parsed with; all operations go through the gomysql.GTIDSet
// interface so the two flavors are handled uniformly.
type GTIDBinlogCoordinates struct {
GTIDSet *gomysql.MysqlGTIDSet
GTIDSet gomysql.GTIDSet
}

// NewGTIDBinlogCoordinates parses a MySQL GTID set into a *GTIDBinlogCoordinates struct.
func NewGTIDBinlogCoordinates(gtidSet string) (*GTIDBinlogCoordinates, error) {
set, err := gomysql.ParseMysqlGTIDSet(gtidSet)
return &GTIDBinlogCoordinates{
GTIDSet: set.(*gomysql.MysqlGTIDSet),
}, err
// NewGTIDBinlogCoordinates parses a GTID set string (in the given flavor's
// dialect) into a *GTIDBinlogCoordinates struct.
func NewGTIDBinlogCoordinates(flavor, gtidSet string) (*GTIDBinlogCoordinates, error) {
set, err := gomysql.ParseGTIDSet(flavor, gtidSet)
if err != nil {
return nil, err
}
return &GTIDBinlogCoordinates{GTIDSet: set}, nil
}

// DisplayString returns a user-friendly string representation of these current UUID set or the full GTID set.
Expand All @@ -29,6 +51,9 @@ func (coord *GTIDBinlogCoordinates) DisplayString() string {

// String returns a user-friendly string representation of these full GTID set.
func (coord GTIDBinlogCoordinates) String() string {
if coord.GTIDSet == nil {
return ""
}
return coord.GTIDSet.String()
}

Expand Down Expand Up @@ -74,7 +99,7 @@ func (coord *GTIDBinlogCoordinates) SmallerThanOrEquals(other BinlogCoordinates)
func (coord *GTIDBinlogCoordinates) Clone() BinlogCoordinates {
out := &GTIDBinlogCoordinates{}
if coord.GTIDSet != nil {
out.GTIDSet = coord.GTIDSet.Clone().(*gomysql.MysqlGTIDSet)
out.GTIDSet = coord.GTIDSet.Clone()
}
return out
}
9 changes: 8 additions & 1 deletion go/mysql/replica_terminology_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@ var MysqlReplicaTermMap = map[string]string{
"slave": "replica",
}

// IsMariaDB reports whether the given server version string identifies a
// MariaDB server (as opposed to Oracle MySQL). MariaDB reports versions >= 10
// and differs from MySQL in replica terminology and GTID handling.
func IsMariaDB(mysqlVersion string) bool {
return strings.Contains(strings.ToLower(mysqlVersion), "mariadb")
}

func ReplicaTermFor(mysqlVersion string, term string) string {
// MariaDB reports versions >= 10, which compare greater than the 8.4
// cutoff, but it never adopted the new replica/source terminology. Keep
// the legacy terms for it.
if strings.Contains(strings.ToLower(mysqlVersion), "mariadb") {
if IsMariaDB(mysqlVersion) {
return term
}

Expand Down
39 changes: 36 additions & 3 deletions go/mysql/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,17 @@ func GetMasterConnectionConfigSafe(dbVersion string, connectionConfig *Connectio
}

func GetReplicationBinlogCoordinates(dbVersion string, db *gosql.DB, gtid bool) (readBinlogCoordinates, executeBinlogCoordinates BinlogCoordinates, err error) {
if gtid && IsMariaDB(dbVersion) {
return getMariaDBReplicationGTIDCoordinates(db)
}
showReplicaStatusQuery := fmt.Sprintf("show %s", ReplicaTermFor(dbVersion, `slave status`))
err = sqlutils.QueryRowsMap(db, showReplicaStatusQuery, func(m sqlutils.RowMap) error {
if gtid {
executeBinlogCoordinates, err = NewGTIDBinlogCoordinates(m.GetString("Executed_Gtid_Set"))
executeBinlogCoordinates, err = NewGTIDBinlogCoordinates(MySQLFlavor, m.GetString("Executed_Gtid_Set"))
if err != nil {
return err
}
readBinlogCoordinates, err = NewGTIDBinlogCoordinates(m.GetString("Retrieved_Gtid_Set"))
readBinlogCoordinates, err = NewGTIDBinlogCoordinates(MySQLFlavor, m.GetString("Retrieved_Gtid_Set"))
if err != nil {
return err
}
Expand All @@ -244,10 +247,20 @@ func GetReplicationBinlogCoordinates(dbVersion string, db *gosql.DB, gtid bool)
}

func GetSelfBinlogCoordinates(dbVersion string, db *gosql.DB, gtid bool) (selfBinlogCoordinates BinlogCoordinates, err error) {
if gtid && IsMariaDB(dbVersion) {
// MariaDB does not expose a GTID column in SHOW MASTER STATUS; the
// executed GTID position of this server's own binary log is in
// @@global.gtid_binlog_pos.
var gtidBinlogPos string
if err = db.QueryRow(`select @@global.gtid_binlog_pos`).Scan(&gtidBinlogPos); err != nil {
return nil, err
}
return NewGTIDBinlogCoordinates(MariaDBFlavor, gtidBinlogPos)
}
binaryLogStatusTerm := ReplicaTermFor(dbVersion, "master status")
err = sqlutils.QueryRowsMap(db, fmt.Sprintf("show %s", binaryLogStatusTerm), func(m sqlutils.RowMap) error {
if gtid {
selfBinlogCoordinates, err = NewGTIDBinlogCoordinates(m.GetString("Executed_Gtid_Set"))
selfBinlogCoordinates, err = NewGTIDBinlogCoordinates(MySQLFlavor, m.GetString("Executed_Gtid_Set"))
} else {
selfBinlogCoordinates = NewFileBinlogCoordinates(
m.GetString("File"),
Expand All @@ -259,6 +272,26 @@ func GetSelfBinlogCoordinates(dbVersion string, db *gosql.DB, gtid bool) (selfBi
return selfBinlogCoordinates, err
}

// getMariaDBReplicationGTIDCoordinates reports the IO/SQL thread GTID positions
// of a MariaDB replica. MariaDB has no Executed_Gtid_Set/Retrieved_Gtid_Set
// columns: the IO thread position is in SHOW SLAVE STATUS's Gtid_IO_Pos, and the
// applied position is in @@global.gtid_slave_pos.
func getMariaDBReplicationGTIDCoordinates(db *gosql.DB) (readBinlogCoordinates, executeBinlogCoordinates BinlogCoordinates, err error) {
err = sqlutils.QueryRowsMap(db, "show slave status", func(m sqlutils.RowMap) error {
readBinlogCoordinates, err = NewGTIDBinlogCoordinates(MariaDBFlavor, m.GetString("Gtid_IO_Pos"))
return err
})
if err != nil {
return readBinlogCoordinates, executeBinlogCoordinates, err
}
var gtidSlavePos string
if err = db.QueryRow(`select @@global.gtid_slave_pos`).Scan(&gtidSlavePos); err != nil {
return readBinlogCoordinates, executeBinlogCoordinates, err
}
executeBinlogCoordinates, err = NewGTIDBinlogCoordinates(MariaDBFlavor, gtidSlavePos)
return readBinlogCoordinates, executeBinlogCoordinates, err
}

// GetInstanceKey reads hostname and port on given DB
func GetInstanceKey(db *gosql.DB) (instanceKey *InstanceKey, err error) {
instanceKey = &InstanceKey{}
Expand Down
Loading
Loading