Module: Pgbus::PgmqSchema

Defined in:
lib/pgbus/pgmq_schema.rb

Overview

Manages embedded PGMQ SQL schema for extension-free installations.

Supports three modes:

:auto      - Try extension first, fall back to embedded SQL
:extension - Require the pgmq PostgreSQL extension
:embedded  - Use vendored SQL (no extension needed)

Vendored SQL files live in lib/pgbus/pgmq_schema/pgmq_vVERSION.sql and are exact copies of the upstream pgmq-extension/sql/pgmq.sql at each release.

Defined Under Namespace

Classes: VersionNotFoundError

Constant Summary collapse

SCHEMA_DIR =
File.expand_path("pgmq_schema", __dir__).freeze

Class Method Summary collapse

Class Method Details

.available_versionsObject

Returns sorted list of all vendored PGMQ versions.



25
26
27
28
29
# File 'lib/pgbus/pgmq_schema.rb', line 25

def available_versions
  Dir.glob(File.join(SCHEMA_DIR, "pgmq_v*.sql"))
     .map { |f| File.basename(f).match(/pgmq_v(.+)\.sql/)[1] }
     .sort_by { |v| Gem::Version.new(v) }
end

.drop_pgmq_functions_sqlObject

SQL to drop all pgmq functions/types (for clean upgrade). Uses CASCADE so dependent objects are also dropped.



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/pgbus/pgmq_schema.rb', line 97

def drop_pgmq_functions_sql
  <<~SQL
    DO $$
    DECLARE
      r RECORD;
    BEGIN
      -- Drop all functions in pgmq schema
      FOR r IN
        SELECT pg_catalog.pg_get_functiondef(p.oid) AS funcdef,
               n.nspname || '.' || p.proname || '(' ||
                 pg_catalog.pg_get_function_identity_arguments(p.oid) || ')' AS func_sig
        FROM pg_catalog.pg_proc p
        JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
        WHERE n.nspname = 'pgmq'
      LOOP
        EXECUTE 'DROP FUNCTION IF EXISTS ' || r.func_sig || ' CASCADE';
      END LOOP;

      -- Drop custom types in pgmq schema
      FOR r IN
        SELECT n.nspname || '.' || t.typname AS type_name
        FROM pg_catalog.pg_type t
        JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
        WHERE n.nspname = 'pgmq'
          AND t.typtype = 'c'
          AND NOT EXISTS (
            SELECT 1 FROM pg_catalog.pg_class c
            WHERE c.reltype = t.oid
          )
      LOOP
        EXECUTE 'DROP TYPE IF EXISTS ' || r.type_name || ' CASCADE';
      END LOOP;
    END $$;
  SQL
end

.install_sql(version = latest_version) ⇒ String

Returns the SQL to install PGMQ schema without the extension. Strips the extension-only pg_dump config blocks since they’re irrelevant when not installed as an extension.

Parameters:

  • version (String) (defaults to: latest_version)

    defaults to latest

Returns:

  • (String)

    SQL



54
55
56
57
# File 'lib/pgbus/pgmq_schema.rb', line 54

def install_sql(version = latest_version)
  sql = sql_for_version(version)
  strip_extension_only_blocks(sql)
end

.latest_versionObject

Returns the latest vendored PGMQ version string.



20
21
22
# File 'lib/pgbus/pgmq_schema.rb', line 20

def latest_version
  available_versions.last
end

.sql_for_version(version) ⇒ Object

Returns the raw SQL content for a given version.



44
45
46
# File 'lib/pgbus/pgmq_schema.rb', line 44

def sql_for_version(version)
  File.read(sql_path(version))
end

.sql_path(version) ⇒ String

Returns the filesystem path to the vendored SQL file for a given version.

Parameters:

  • version (String)

    e.g. “1.11.0”

Returns:

  • (String)

    absolute path

Raises:



36
37
38
39
40
41
# File 'lib/pgbus/pgmq_schema.rb', line 36

def sql_path(version)
  path = File.join(SCHEMA_DIR, "pgmq_v#{version}.sql")
  raise VersionNotFoundError, "No vendored PGMQ SQL for version #{version}" unless File.exist?(path)

  path
end

.version_tracking_extension_sql(version = latest_version) ⇒ String

Returns SQL to record an extension-based installation in the version tracking table.

Parameters:

  • version (String) (defaults to: latest_version)

Returns:

  • (String)

    SQL



81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/pgbus/pgmq_schema.rb', line 81

def version_tracking_extension_sql(version = latest_version)
  <<~SQL
    CREATE TABLE IF NOT EXISTS pgbus_pgmq_schema_versions (
      id SERIAL PRIMARY KEY,
      version VARCHAR NOT NULL,
      installed_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
      install_method VARCHAR NOT NULL DEFAULT 'embedded'
    );

    INSERT INTO pgbus_pgmq_schema_versions (version, install_method)
    VALUES ('#{version}', 'extension');
  SQL
end

.version_tracking_sql(version = latest_version) ⇒ String

Returns SQL to create the version tracking table and record an installation.

Parameters:

  • version (String) (defaults to: latest_version)

    defaults to latest

Returns:

  • (String)

    SQL



63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/pgbus/pgmq_schema.rb', line 63

def version_tracking_sql(version = latest_version)
  <<~SQL
    CREATE TABLE IF NOT EXISTS pgbus_pgmq_schema_versions (
      id SERIAL PRIMARY KEY,
      version VARCHAR NOT NULL,
      installed_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
      install_method VARCHAR NOT NULL DEFAULT 'embedded'
    );

    INSERT INTO pgbus_pgmq_schema_versions (version, install_method)
    VALUES ('#{version}', 'embedded');
  SQL
end