From d403e18d258d42354a30152f29f25cc799a32b42 Mon Sep 17 00:00:00 2001 From: Jacob Bohanon Date: Fri, 13 Feb 2026 00:09:11 -0500 Subject: [PATCH] initial commit --- datastores/sql/postgres/config.go | 80 ++++++++++++ datastores/sql/postgres/migrate.go | 201 +++++++++++++++++++++++++++++ datastores/sql/postgres/pool.go | 66 ++++++++++ go.mod | 12 ++ go.sum | 19 +++ 5 files changed, 378 insertions(+) create mode 100644 datastores/sql/postgres/config.go create mode 100644 datastores/sql/postgres/migrate.go create mode 100644 datastores/sql/postgres/pool.go create mode 100644 go.mod create mode 100644 go.sum diff --git a/datastores/sql/postgres/config.go b/datastores/sql/postgres/config.go new file mode 100644 index 0000000..ba038a6 --- /dev/null +++ b/datastores/sql/postgres/config.go @@ -0,0 +1,80 @@ +package postgres + +import ( + "context" + "fmt" + "os" + + "github.com/jackc/pgx/v5" +) + +// Config holds the database configuration. +type Config struct { + ctx context.Context + Host string + Port string + User string + Password string + DBName string + SSLMode string +} + +// ConfigOption is a functional option for Config. +type ConfigOption func(*Config) + +// WithContext sets the context on the config. +func WithContext(ctx context.Context) ConfigOption { + return func(c *Config) { + c.ctx = ctx + } +} + +// NewConfig creates a new database configuration from environment variables. +// DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DB_SSL_MODE fall back to sensible +// defaults. DB_NAME is required and must be set in the environment. +func NewConfig(opts ...ConfigOption) *Config { + c := &Config{ + Host: getEnvOrDefault("DB_HOST", "localhost"), + Port: getEnvOrDefault("DB_PORT", "5432"), + User: getEnvOrDefault("DB_USER", "postgres"), + Password: getEnvOrDefault("DB_PASSWORD", "postgres"), + DBName: os.Getenv("DB_NAME"), + SSLMode: getEnvOrDefault("DB_SSL_MODE", "disable"), + } + for _, opt := range opts { + opt(c) + } + return c +} + +// DSN returns the database connection string in key=value format. +func (c *Config) DSN() string { + return fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=%s", + c.Host, c.Port, c.User, c.Password, c.DBName, c.SSLMode) +} + +// Connect creates a new single database connection using pgx. +func (c *Config) Connect() (*pgx.Conn, error) { + conn, err := pgx.Connect(c.Context(), c.DSN()) + if err != nil { + return nil, fmt.Errorf("failed to connect to database: %w", err) + } + return conn, nil +} + +// Context returns the context associated with this config, defaulting to +// context.Background() if none was set. +func (c *Config) Context() context.Context { + if c.ctx == nil { + c.ctx = context.Background() + } + return c.ctx +} + +// getEnvOrDefault returns the value of an environment variable or a fallback. +func getEnvOrDefault(key, fallback string) string { + if value := os.Getenv(key); value != "" { + return value + } + return fallback +} diff --git a/datastores/sql/postgres/migrate.go b/datastores/sql/postgres/migrate.go new file mode 100644 index 0000000..b74087c --- /dev/null +++ b/datastores/sql/postgres/migrate.go @@ -0,0 +1,201 @@ +package postgres + +import ( + "context" + "database/sql" + "fmt" + "os" + "path/filepath" + "sort" + "strings" + "time" + + _ "github.com/jackc/pgx/v5/stdlib" // Register pgx driver for database/sql +) + +// MigrateOptions configures the migration runner. +type MigrateOptions struct { + // MigrationsDir is the directory containing *.up.sql migration files. + // If empty, it defaults to "migrations" relative to the working directory. + MigrationsDir string + + // BootstrapTable is the name of a table whose existence signals that the + // database already has a schema applied before the schema_migrations + // tracking table was introduced. When non-empty and this table exists but + // schema_migrations does not, all discovered migrations are recorded as + // already applied (bootstrapped). When empty, no bootstrapping is performed. + BootstrapTable string +} + +// Migrate runs forward (up) migrations using the config's DSN. +// It discovers *.up.sql files in the configured directory, tracks applied +// versions in a schema_migrations table, and applies any pending migrations +// in filename-sorted order. +func (c *Config) Migrate(opts *MigrateOptions) error { + if opts == nil { + opts = &MigrateOptions{} + } + + db, err := sql.Open("pgx", c.DSN()) + if err != nil { + return fmt.Errorf("failed to open database: %w", err) + } + defer db.Close() + + if err := db.Ping(); err != nil { + return fmt.Errorf("failed to ping database: %w", err) + } + + ctx := context.Background() + + migrationsTableExists := tableExists(ctx, db, "schema_migrations") + + if err := ensureMigrationsTable(ctx, db); err != nil { + return fmt.Errorf("failed to create migrations table: %w", err) + } + + migrationsPath := opts.MigrationsDir + if migrationsPath == "" { + wd, err := os.Getwd() + if err != nil { + return fmt.Errorf("failed to get working directory: %w", err) + } + migrationsPath = filepath.Join(wd, "migrations") + } + + migrations, err := discoverMigrations(migrationsPath) + if err != nil { + return fmt.Errorf("failed to discover migrations: %w", err) + } + + // Bootstrap: if schema_migrations was just created but the database already + // has a schema, mark all discovered migrations as already applied. + if !migrationsTableExists && opts.BootstrapTable != "" { + if err := bootstrapMigrationState(ctx, db, migrations, opts.BootstrapTable); err != nil { + return fmt.Errorf("failed to bootstrap migration state: %w", err) + } + } + + applied, err := getAppliedMigrations(ctx, db) + if err != nil { + return fmt.Errorf("failed to get applied migrations: %w", err) + } + + for _, migration := range migrations { + if applied[migration] { + continue + } + + migrationSQL, err := os.ReadFile(filepath.Join(migrationsPath, migration)) + if err != nil { + return fmt.Errorf("failed to read migration file %s: %w", migration, err) + } + + if _, err := db.ExecContext(ctx, string(migrationSQL)); err != nil { + return fmt.Errorf("failed to execute migration %s: %w", migration, err) + } + + if err := recordMigration(ctx, db, migration); err != nil { + return fmt.Errorf("failed to record migration %s: %w", migration, err) + } + + fmt.Printf("Applied migration: %s\n", migration) + } + + return nil +} + +// ensureMigrationsTable creates the schema_migrations table if it doesn't exist. +func ensureMigrationsTable(ctx context.Context, db *sql.DB) error { + _, err := db.ExecContext(ctx, ` + CREATE TABLE IF NOT EXISTS schema_migrations ( + version VARCHAR(255) PRIMARY KEY, + applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP + ) + `) + return err +} + +// tableExists checks if a table exists in the database. +func tableExists(ctx context.Context, db *sql.DB, tableName string) bool { + var exists bool + err := db.QueryRowContext(ctx, + "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)", + tableName, + ).Scan(&exists) + return err == nil && exists +} + +// bootstrapMigrationState marks all discovered migrations as applied when the +// database already contains a schema (detected via sentinelTable) but has no +// schema_migrations table yet. +func bootstrapMigrationState(ctx context.Context, db *sql.DB, migrations []string, sentinelTable string) error { + if !tableExists(ctx, db, sentinelTable) { + // Fresh database, no bootstrapping needed. + return nil + } + + for _, migration := range migrations { + if err := recordMigration(ctx, db, migration); err != nil { + return fmt.Errorf("failed to record bootstrapped migration %s: %w", migration, err) + } + fmt.Printf("Bootstrapped migration (already applied): %s\n", migration) + } + + return nil +} + +// getAppliedMigrations returns a set of already-applied migration filenames. +func getAppliedMigrations(ctx context.Context, db *sql.DB) (map[string]bool, error) { + applied := make(map[string]bool) + + rows, err := db.QueryContext(ctx, "SELECT version FROM schema_migrations") + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var version string + if err := rows.Scan(&version); err != nil { + return nil, err + } + applied[version] = true + } + + return applied, rows.Err() +} + +// recordMigration records a migration version as applied. +func recordMigration(ctx context.Context, db *sql.DB, version string) error { + _, err := db.ExecContext(ctx, + "INSERT INTO schema_migrations (version, applied_at) VALUES ($1, $2)", + version, time.Now(), + ) + return err +} + +// discoverMigrations finds all *.up.sql files in the given directory and +// returns them sorted by filename (relies on numeric prefixes like +// 000001_, 000002_, etc.). +func discoverMigrations(migrationsPath string) ([]string, error) { + entries, err := os.ReadDir(migrationsPath) + if err != nil { + return nil, err + } + + var migrations []string + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + if strings.HasSuffix(name, ".up.sql") { + migrations = append(migrations, name) + } + } + + sort.Strings(migrations) + + return migrations, nil +} diff --git a/datastores/sql/postgres/pool.go b/datastores/sql/postgres/pool.go new file mode 100644 index 0000000..2e59170 --- /dev/null +++ b/datastores/sql/postgres/pool.go @@ -0,0 +1,66 @@ +package postgres + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// PoolConfig holds the configuration for the database connection pool. +type PoolConfig struct { + MaxConns int + MinConns int + MaxConnLifetime time.Duration + MaxConnIdleTime time.Duration + HealthCheckPeriod time.Duration +} + +// DefaultPoolConfig returns a reasonable default pool configuration. +func DefaultPoolConfig() *PoolConfig { + return &PoolConfig{ + MaxConns: 10, + MinConns: 2, + MaxConnLifetime: time.Hour, + MaxConnIdleTime: 30 * time.Minute, + HealthCheckPeriod: time.Minute, + } +} + +// NewPool creates a new database connection pool. If poolConfig is nil, +// DefaultPoolConfig() is used. +func NewPool(ctx context.Context, config *Config, poolConfig *PoolConfig) (*pgxpool.Pool, error) { + if poolConfig == nil { + poolConfig = DefaultPoolConfig() + } + + pgxConfig, err := pgxpool.ParseConfig(config.DSN()) + if err != nil { + return nil, fmt.Errorf("failed to parse pool config: %w", err) + } + + pgxConfig.MaxConns = int32(poolConfig.MaxConns) + pgxConfig.MinConns = int32(poolConfig.MinConns) + pgxConfig.MaxConnLifetime = poolConfig.MaxConnLifetime + pgxConfig.MaxConnIdleTime = poolConfig.MaxConnIdleTime + pgxConfig.HealthCheckPeriod = poolConfig.HealthCheckPeriod + + pool, err := pgxpool.NewWithConfig(ctx, pgxConfig) + if err != nil { + return nil, fmt.Errorf("failed to create connection pool: %w", err) + } + + if err := pool.Ping(ctx); err != nil { + return nil, fmt.Errorf("failed to ping database: %w", err) + } + + return pool, nil +} + +// ClosePool closes the database connection pool if it is non-nil. +func ClosePool(pool *pgxpool.Pool) { + if pool != nil { + pool.Close() + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..91c4ffc --- /dev/null +++ b/go.mod @@ -0,0 +1,12 @@ +module git.nonahob.net/jacob/golibs + +go 1.24.13 + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgx/v5 v5.8.0 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2aca163 --- /dev/null +++ b/go.sum @@ -0,0 +1,19 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= +github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=