Skip to content

Commit

Permalink
feat: create sequence backend (#820)
Browse files Browse the repository at this point in the history
* create sequence backend

* comment fixes

* change

* tests
  • Loading branch information
asthamohta committed May 29, 2024
1 parent 0790b80 commit 4af4747
Show file tree
Hide file tree
Showing 24 changed files with 1,833 additions and 84 deletions.
20 changes: 9 additions & 11 deletions accessors/spanner/spanner_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ var (
// This number should not be too high so as to not hit the AdminQuota limit.
// AdminQuota limits are mentioned here: https://cloud.google.com/spanner/quotas#administrative_limits
// If facing a quota limit error, consider reducing this value.
MaxWorkers = 50
MaxWorkers = 50
)


// The SpannerAccessor provides methods that internally use a spanner client (can be adminClient/databaseclient/instanceclient etc).
// Methods should only contain generic logic here that can be used by multiple workflows.
type SpannerAccessor interface {
Expand All @@ -64,11 +63,11 @@ type SpannerAccessor interface {
// Create a change stream with default options.
CreateChangeStream(ctx context.Context, adminClient spanneradmin.AdminClient, changeStreamName, dbURI string) error
// Create new Database using conv
CreateDatabase(ctx context.Context, adminClient spanneradmin.AdminClient, dbURI string, conv *internal.Conv, driver string, migrationType string) error
CreateDatabase(ctx context.Context, adminClient spanneradmin.AdminClient, dbURI string, conv *internal.Conv, driver string, migrationType string) error
// Update Database using conv
UpdateDatabase(ctx context.Context, adminClient spanneradmin.AdminClient, dbURI string, conv *internal.Conv, driver string) error
// Updates an existing Spanner database or create a new one if one does not exist using Conv
CreateOrUpdateDatabase(ctx context.Context, adminClient spanneradmin.AdminClient, dbURI, driver string, conv *internal.Conv, migrationType string) error
CreateOrUpdateDatabase(ctx context.Context, adminClient spanneradmin.AdminClient, dbURI, driver string, conv *internal.Conv, migrationType string) error
// Check whether the db exists and if it does, verify if the schema is what we currently support.
VerifyDb(ctx context.Context, adminClient spanneradmin.AdminClient, dbURI string) (dbExists bool, err error)
// Verify if an existing DB's ddl follows what is supported by Spanner migration tool. Currently, we only support empty schema when db already exists.
Expand Down Expand Up @@ -253,9 +252,9 @@ func (sp *SpannerAccessorImpl) CreateDatabase(ctx context.Context, adminClient s
} else {
req.CreateStatement = "CREATE DATABASE `" + dbName + "`"
if migrationType == constants.DATAFLOW_MIGRATION {
req.ExtraStatements = conv.SpSchema.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver})
req.ExtraStatements = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
} else {
req.ExtraStatements = conv.SpSchema.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, SpDialect: conv.SpDialect, Source: driver})
req.ExtraStatements = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
}

}
Expand All @@ -281,7 +280,7 @@ func (sp *SpannerAccessorImpl) UpdateDatabase(ctx context.Context, adminClient s
// Spanner DDL doesn't accept them), and protects table and col names
// using backticks (to avoid any issues with Spanner reserved words).
// Foreign Keys are set to false since we create them post data migration.
schema := conv.SpSchema.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, SpDialect: conv.SpDialect, Source: driver})
schema := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
req := &adminpb.UpdateDatabaseDdlRequest{
Database: dbURI,
Statements: schema,
Expand Down Expand Up @@ -348,7 +347,6 @@ func (sp *SpannerAccessorImpl) ValidateDDL(ctx context.Context, adminClient span
return nil
}


// UpdateDDLForeignKeys updates the Spanner database with foreign key
// constraints using ALTER TABLE statements.
func (sp *SpannerAccessorImpl) UpdateDDLForeignKeys(ctx context.Context, adminClient spanneradmin.AdminClient, dbURI string, conv *internal.Conv, driver string, migrationType string) {
Expand All @@ -361,7 +359,7 @@ func (sp *SpannerAccessorImpl) UpdateDDLForeignKeys(ctx context.Context, adminCl
// The schema we send to Spanner excludes comments (since Cloud
// Spanner DDL doesn't accept them), and protects table and col names
// using backticks (to avoid any issues with Spanner reserved words).
fkStmts := conv.SpSchema.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: false, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver})
fkStmts := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: false, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
if len(fkStmts) == 0 {
return
}
Expand Down Expand Up @@ -408,12 +406,12 @@ func (sp *SpannerAccessorImpl) UpdateDDLForeignKeys(ctx context.Context, adminCl
Statements: []string{fkStmt},
})
if err != nil {
logger.Log.Debug("Can't add foreign key with statement:"+fkStmt+"\n due to error:"+err.Error()+" Skipping this foreign key...\n")
logger.Log.Debug("Can't add foreign key with statement:" + fkStmt + "\n due to error:" + err.Error() + " Skipping this foreign key...\n")
conv.Unexpected(fmt.Sprintf("Can't add foreign key with statement %s: %s", fkStmt, err))
return
}
if err := op.Wait(ctx); err != nil {
logger.Log.Debug("Can't add foreign key with statement:"+fkStmt+"\n due to error:"+err.Error()+" Skipping this foreign key...\n")
logger.Log.Debug("Can't add foreign key with statement:" + fkStmt + "\n due to error:" + err.Error() + " Skipping this foreign key...\n")
conv.Unexpected(fmt.Sprintf("Can't add foreign key with statement %s: %s", fkStmt, err))
return
}
Expand Down
3 changes: 2 additions & 1 deletion common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ const (
SMT_JOB_TABLE string = "SMT_JOB"
SMT_RESOURCE_TABLE string = "SMT_RESOURCE"
// Auto Generated Keys
UUID string = "UUID"
UUID string = "UUID"
SEQUENCE string = "Sequence"
// Default gcs path of the Dataflow template.
DEFAULT_TEMPLATE_PATH string = "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Spanner"
)
5 changes: 2 additions & 3 deletions conversion/store_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func WriteSchemaFile(conv *internal.Conv, now time.Time, name string, out *os.Fi
// and doesn't add backticks around table and column names. This file is
// intended for explanatory and documentation purposes, and is not strictly
// legal Cloud Spanner DDL (Cloud Spanner doesn't currently support comments).
spDDL := conv.SpSchema.GetDDL(ddl.Config{Comments: true, ProtectIds: false, Tables: true, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver})
spDDL := ddl.GetDDL(ddl.Config{Comments: true, ProtectIds: false, Tables: true, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
if len(spDDL) == 0 {
spDDL = []string{"\n-- Schema is empty -- no tables found\n"}
}
Expand All @@ -69,7 +69,7 @@ func WriteSchemaFile(conv *internal.Conv, now time.Time, name string, out *os.Fi

// We change 'Comments' to false and 'ProtectIds' to true below to write out a
// schema file that is a legal Cloud Spanner DDL.
spDDL = conv.SpSchema.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver})
spDDL = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
if len(spDDL) == 0 {
spDDL = []string{"\n-- Schema is empty -- no tables found\n"}
}
Expand Down Expand Up @@ -202,7 +202,6 @@ func WriteBadData(bw *writer.BatchWriter, conv *internal.Conv, banner, name stri
fmt.Fprintf(out, "See file '%s' for details of bad rows\n", name)
}


// writeBadStreamingData writes sample of bad records and dropped records during streaming
// migration process to bad data file.
func writeBadStreamingData(conv *internal.Conv, f *os.File) error {
Expand Down
135 changes: 135 additions & 0 deletions conversion/store_files_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package conversion

import (
"path/filepath"
"testing"

"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/schema"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl"
"github.com/stretchr/testify/assert"
)

func TestReadSessionFile(t *testing.T) {
createdExpectedConv := func() *internal.Conv {
expectedConv := internal.MakeConv()
expectedConv.SpSchema = map[string]ddl.CreateTable{
"t1": {
Name: "numbers",
ColIds: []string{"c1", "c2"},
ShardIdColumn: "c1",
ColDefs: map[string]ddl.ColumnDef{
"c1": {
Name: "id",
NotNull: true,
Comment: "From: id int(10)",
Id: "c2",
},
"c2": {
Name: "value",
NotNull: false,
Id: "c2",
},
},
PrimaryKeys: []ddl.IndexKey{
{
ColId: "c1",
Order: 1,
},
},
Comment: "Spanner schema for source table numbers",
Id: "t1",
},
}
expectedConv.SrcSchema = map[string]schema.Table{
"t1": {
Name: "numbers",
Schema: "default",
ColIds: []string{"c1", "c2"},
ColDefs: map[string]schema.Column{
"c1": {
Name: "id",
Type: schema.Type{
Name: "int",
Mods: []int64{10},
},
NotNull: true,
Id: "c1",
},
"c2": {
Name: "value",
Type: schema.Type{
Name: "int",
Mods: []int64{10},
},
NotNull: true,
Id: "c2",
},
},
PrimaryKeys: []schema.Key{
{
ColId: "c1",
Desc: false,
Order: 1,
},
},
Id: "t1",
},
}
expectedConv.SchemaIssues = map[string]internal.TableIssues{
"t1": {
ColumnLevelIssues: map[string][]internal.SchemaIssue{
"c1": {14},
},
},
}
return expectedConv
}
expectedConvWithSequences := createdExpectedConv()
expectedConvWithSequences.SpSequences = map[string]ddl.Sequence{
"s1": {
Name: "Seq",
Id: "s1",
SequenceKind: "BIT REVERSED POSITIVE",
},
}
testCases := []struct {
name string
filePath string
expectedConv *internal.Conv
expectError bool
}{
{
name: "test basic session file",
filePath: filepath.Join("..", "test_data", "basic_session_file_test.json"),
expectedConv: expectedConvWithSequences,
expectError: false,
},
{
name: "test session file without sequences",
filePath: filepath.Join("..", "test_data", "basic_sessions_file_wo_sequences_test.json"),
expectedConv: createdExpectedConv(),
expectError: false,
},
}
for _, tc := range testCases {
conv := internal.MakeConv()
err := ReadSessionFile(conv, tc.filePath)
assert.Equal(t, tc.expectError, err != nil, tc.name)
assert.Equal(t, &tc.expectedConv, &conv, tc.name)
}
}
34 changes: 19 additions & 15 deletions internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,22 @@ type Conv struct {
ToSource map[string]NameAndCols `json:"-"` // Maps from Spanner table name to source-DB table name and column mapping.
UsedNames map[string]bool `json:"-"` // Map storing the names that are already assigned to tables, indices or foreign key contraints.
dataSink func(table string, cols []string, values []interface{})
DataFlush func() `json:"-"` // Data flush is used to flush out remaining writes and wait for them to complete.
Location *time.Location // Timezone (for timestamp conversion).
sampleBadRows rowSamples // Rows that generated errors during conversion.
Stats stats `json:"-"`
TimezoneOffset string // Timezone offset for timestamp conversion.
SpDialect string // The dialect of the spanner database to which Spanner migration tool is writing.
UniquePKey map[string][]string // Maps Spanner table name to unique column name being used as primary key (if needed).
Audit Audit `json:"-"` // Stores the audit information for the database conversion
Rules []Rule // Stores applied rules during schema conversion
IsSharded bool // Flag denoting if the migration is sharded or not
ConvLock sync.RWMutex `json:"-"` // ConvLock prevents concurrent map read/write operations. This lock will be used in all the APIs that either read or write elements to the conv object.
SpRegion string // Leader Region for Spanner Instance
ResourceValidation bool // Flag denoting if validation for resources to generated is complete
UI bool // Flag if UI interface was used for migration. ToDo: Remove flag after resource generation is introduced to UI
DataFlush func() `json:"-"` // Data flush is used to flush out remaining writes and wait for them to complete.
Location *time.Location // Timezone (for timestamp conversion).
sampleBadRows rowSamples // Rows that generated errors during conversion.
Stats stats `json:"-"`
TimezoneOffset string // Timezone offset for timestamp conversion.
SpDialect string // The dialect of the spanner database to which Spanner migration tool is writing.
UniquePKey map[string][]string // Maps Spanner table name to unique column name being used as primary key (if needed).
Audit Audit `json:"-"` // Stores the audit information for the database conversion
Rules []Rule // Stores applied rules during schema conversion
IsSharded bool // Flag denoting if the migration is sharded or not
ConvLock sync.RWMutex `json:"-"` // ConvLock prevents concurrent map read/write operations. This lock will be used in all the APIs that either read or write elements to the conv object.
SpRegion string // Leader Region for Spanner Instance
ResourceValidation bool // Flag denoting if validation for resources to generated is complete
UI bool // Flag if UI interface was used for migration. ToDo: Remove flag after resource generation is introduced to UI
SpSequences map[string]ddl.Sequence // Maps Spanner Sequences to Sequence Schema
SrcSequences map[string]ddl.Sequence // Maps source-DB Sequences to Sequence schema information
}

type TableIssues struct {
Expand Down Expand Up @@ -306,7 +308,9 @@ func MakeConv() *Conv {
StreamingStats: streamingStats{},
MigrationType: migration.MigrationData_SCHEMA_ONLY.Enum(),
},
Rules: []Rule{},
Rules: []Rule{},
SpSequences: make(map[string]ddl.Sequence),
SrcSequences: make(map[string]ddl.Sequence),
}
}

Expand Down
3 changes: 3 additions & 0 deletions internal/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func GenerateIndexesId() string {
func GenerateRuleId() string {
return GenerateId("r")
}
func GenerateSequenceId() string {
return GenerateId("s")
}

func GetSrcColNameIdMap(srcs schema.Table) map[string]string {
if len(srcs.ColNameIdMap) > 0 {
Expand Down
6 changes: 3 additions & 3 deletions sources/mysql/mysqldump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,12 +846,12 @@ func TestProcessMySQLDump_GetDDL(t *testing.T) {
"ALTER TABLE cart ADD CONSTRAINT cart_pkey PRIMARY KEY (productid, userid);")
expected :=
"CREATE TABLE cart (\n" +
" productid STRING(MAX) NOT NULL,\n" +
" userid STRING(MAX) NOT NULL,\n" +
" productid STRING(MAX) NOT NULL ,\n" +
" userid STRING(MAX) NOT NULL ,\n" +
" quantity INT64,\n" +
") PRIMARY KEY (productid, userid)"
c := ddl.Config{Tables: true}
assert.Equal(t, expected, strings.Join(conv.SpSchema.GetDDL(c), " "))
assert.Equal(t, expected, strings.Join(ddl.GetDDL(c, conv.SpSchema, conv.SpSequences), " "))
}

func TestProcessMySQLDump_Rows(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions sources/postgres/pgdump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,26 +1401,26 @@ func TestProcessPgDump_GetDDL(t *testing.T) {
"ALTER TABLE ONLY cart ADD CONSTRAINT cart_pkey PRIMARY KEY (productid, userid);")
expected :=
"CREATE TABLE cart (\n" +
" productid STRING(MAX) NOT NULL,\n" +
" userid STRING(MAX) NOT NULL,\n" +
" productid STRING(MAX) NOT NULL ,\n" +
" userid STRING(MAX) NOT NULL ,\n" +
" quantity INT64,\n" +
") PRIMARY KEY (productid, userid)"
c := ddl.Config{Tables: true}
assert.Equal(t, expected, strings.Join(conv.SpSchema.GetDDL(c), " "))
assert.Equal(t, expected, strings.Join(ddl.GetDDL(c, conv.SpSchema, conv.SpSequences), " "))
}

func TestProcessPgDump_GetPGDDL(t *testing.T) {
conv, _ := runProcessPgDumpPGTarget("CREATE TABLE cart (productid text, userid text, quantity bigint);\n" +
"ALTER TABLE ONLY cart ADD CONSTRAINT cart_pkey PRIMARY KEY (productid, userid);")
expected :=
"CREATE TABLE cart (\n" +
" productid VARCHAR(2621440) NOT NULL,\n" +
" userid VARCHAR(2621440) NOT NULL,\n" +
" productid VARCHAR(2621440) NOT NULL ,\n" +
" userid VARCHAR(2621440) NOT NULL ,\n" +
" quantity INT8,\n" +
" PRIMARY KEY (productid, userid)\n" +
")"
c := ddl.Config{Tables: true, SpDialect: conv.SpDialect}
assert.Equal(t, expected, strings.Join(conv.SpSchema.GetDDL(c), " "))
assert.Equal(t, expected, strings.Join(ddl.GetDDL(c, conv.SpSchema, conv.SpSequences), " "))
}

func TestProcessPgDump_Rows(t *testing.T) {
Expand Down
Loading

0 comments on commit 4af4747

Please sign in to comment.