【PostgreSQL】外部キー参照しているレコードを芋づる式に削除する

PostgreSQL の外部キー制約には ON DELETE CASCADE を指定することができる。

...
 FOREIGN KEY (parent_id) REFERENCES parent_table (parent_id) ON DELETE CASCADE

これにより、親テーブルのレコードが削除されたときに、その親レコードを参照する子テーブルの関連レコードが自動的に削除される。

外部キーに ON DELETE CASCADE が付与されていないケースでは、以下の関数で ON DELETE CASCADE と同じように、再帰的に削除を行う DELETE 文が生成できる。

CREATE OR REPLACE FUNCTION generate_recursive_delete_statements(
    p_schema_name TEXT,
    p_table_name TEXT,
    p_pk_values JSONB
)
RETURNS TEXT[] AS $$
/**
 * @description Generates a series of DELETE statements to safely remove a row and all its
 * dependent child records by recursively traversing foreign key constraints.
 * This function correctly handles complex dependencies, including associative/junction tables,
 * by finding the longest dependency path to each record, ensuring a valid topological sort for deletion.
 *
 * @param p_schema_name TEXT - The schema name of the starting table.
 * @param p_table_name TEXT - The table name of the starting table.
 * @param p_pk_values JSONB - A JSON object containing the primary key values of the row to delete.
 * e.g., '{"id": 123}' or '{"key_part1": "a", "key_part2": 1}'
 *
 * @returns TEXT[] - An array of DELETE statements, ordered correctly (children before parents) for execution.
 */
DECLARE
    v_statements TEXT[] := '{}';
    v_target_table_oid OID;
    fk_relation RECORD;
    v_sql TEXT;
    v_rows_affected_in_iteration INT;
BEGIN
    -- Step 1: Validate inputs and prepare the temporary table for the delete plan.
    -----------------------------------------------------------------------------
    BEGIN
        v_target_table_oid := (quote_ident(p_schema_name) || '.' || quote_ident(p_table_name))::regclass::OID;
    EXCEPTION WHEN OTHERS THEN
        RAISE EXCEPTION 'Table not found: %.%', p_schema_name, p_table_name;
    END;

    IF jsonb_typeof(p_pk_values) != 'object' THEN
        RAISE EXCEPTION 'p_pk_values must be a JSON object. e.g., ''{"id": 123}''.';
    END IF;

    -- Clean up the temp table from any previous failed run within the same transaction.
    DROP TABLE IF EXISTS delete_plan;
    
    CREATE TEMP TABLE delete_plan (
        level INT NOT NULL,
        table_oid OID NOT NULL,
        pk_values JSONB NOT NULL,
        UNIQUE (table_oid, pk_values)
    ) ON COMMIT DROP;

    INSERT INTO delete_plan (level, table_oid, pk_values)
    VALUES (0, v_target_table_oid, p_pk_values);

    -- Step 2: Iteratively traverse dependencies until the dependency graph is stable.
    -- This Bellman-Ford-like approach ensures we find the longest path to each node (record),
    -- which is crucial for determining the correct deletion order in complex graphs.
    ------------------------------------------------------------------------------------------------
    LOOP
        v_rows_affected_in_iteration := 0;

        FOR fk_relation IN
            WITH tables_in_plan AS (
                SELECT DISTINCT table_oid FROM delete_plan
            )
            SELECT
                con.conrelid AS child_oid,
                con.confrelid AS parent_oid,
                (SELECT n.nspname FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.oid = con.conrelid) AS child_schema,
                (SELECT c.relname FROM pg_class c WHERE c.oid = con.conrelid) AS child_table,
                (SELECT array_agg(a.attname ORDER BY u.ord) FROM unnest(con.conkey) WITH ORDINALITY u(attnum, ord) JOIN pg_attribute a ON a.attrelid = con.conrelid AND a.attnum = u.attnum) AS child_columns,
                (SELECT array_agg(a.attname ORDER BY u.ord) FROM unnest(con.confkey) WITH ORDINALITY u(attnum, ord) JOIN pg_attribute a ON a.attrelid = con.confrelid AND a.attnum = u.attnum) AS parent_columns,
                (SELECT array_agg(t.typname ORDER BY u.ord) FROM unnest(con.confkey) WITH ORDINALITY u(attnum, ord) JOIN pg_attribute a ON a.attrelid = con.confrelid AND a.attnum = u.attnum JOIN pg_type t ON t.oid = a.atttypid) AS parent_col_types,
                pk.pk_def_string AS child_pk_def_string,
                pk.child_pk_cols_for_group_by AS child_pk_cols_for_group_by
            FROM pg_constraint con
            -- Join to get child's primary key definition, which is necessary for finding and grouping child records.
            LEFT JOIN LATERAL (
                SELECT
                    -- MODIFIED: Qualify column references with 'child.' to avoid ambiguity.
                    string_agg(format('%L, child.%I', a.attname, a.attname), ', ') AS pk_def_string,
                    string_agg(format('child.%I', a.attname), ', ') AS child_pk_cols_for_group_by
                FROM pg_index i
                JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
                WHERE i.indrelid = con.conrelid AND i.indisprimary
            ) AS pk ON TRUE
            WHERE con.contype = 'f'
              AND con.confrelid IN (SELECT table_oid FROM tables_in_plan) -- Parent table must be in our plan
        LOOP
            IF fk_relation.child_pk_def_string IS NULL THEN
                CONTINUE; -- Skip if child table has no primary key
            END IF;

            DECLARE
                v_parent_data JSONB;
                v_parent_record_def TEXT;
                v_found_children_with_level JSONB;
                v_temp_rows_affected INT;
            BEGIN
                -- Aggregate parent primary keys and their levels for the current relationship.
                -- We include a temporary '__level' key to pass the level into the dynamic query.
                SELECT jsonb_agg(dp.pk_values || jsonb_build_object('__level', dp.level))
                INTO v_parent_data
                FROM delete_plan dp WHERE dp.table_oid = fk_relation.parent_oid;

                IF v_parent_data IS NULL THEN CONTINUE; END IF;

                -- Build the record definition for jsonb_to_recordset, including the level column.
                SELECT string_agg(format('%I %s', fk_relation.parent_columns[i], fk_relation.parent_col_types[i]), ', ') || ', __level int'
                INTO v_parent_record_def
                FROM generate_series(1, array_length(fk_relation.parent_columns, 1)) i;

                -- This dynamic SQL finds all child records and calculates their correct level,
                -- which is max(parent_level) + 1.
                v_sql := format(
                    $sql$
                    SELECT jsonb_agg(c.child_data)
                    FROM (
                        SELECT
                            jsonb_build_object(%1$s) || jsonb_build_object('__level', max(parent.__level) + 1) as child_data
                        FROM %2$I.%3$I AS child
                        JOIN jsonb_to_recordset($1) AS parent(%4$s)
                          ON (%5$s) = (%6$s)
                        GROUP BY %7$s
                    ) c
                    $sql$,
                    fk_relation.child_pk_def_string,
                    fk_relation.child_schema,
                    fk_relation.child_table,
                    v_parent_record_def,
                    (SELECT string_agg(format('child.%I', c), ', ') FROM unnest(fk_relation.child_columns) c),
                    (SELECT string_agg(format('parent.%I', c), ', ') FROM unnest(fk_relation.parent_columns) c),
                    fk_relation.child_pk_cols_for_group_by
                );

                EXECUTE v_sql INTO v_found_children_with_level USING v_parent_data;

                -- Insert new children or update the level of existing children if a longer path is found.
                IF v_found_children_with_level IS NOT NULL THEN
                    WITH new_data AS (
                        SELECT
                            (elem.value ->> '__level')::int AS level,
                            fk_relation.child_oid AS table_oid,
                            elem.value - '__level' AS pk_values -- Remove the temporary level key
                        FROM jsonb_array_elements(v_found_children_with_level) AS elem
                    ),
                    upserted AS (
                        INSERT INTO delete_plan (level, table_oid, pk_values)
                        SELECT nd.level, nd.table_oid, nd.pk_values FROM new_data nd
                        ON CONFLICT (table_oid, pk_values) DO UPDATE
                        SET level = excluded.level
                        WHERE delete_plan.level < excluded.level -- Only update if the new level is higher
                        RETURNING 1
                    )
                    SELECT count(*) INTO v_temp_rows_affected FROM upserted;
                    v_rows_affected_in_iteration := v_rows_affected_in_iteration + v_temp_rows_affected;
                END IF;
            END;
        END LOOP;

        -- If no rows were added or updated in a full pass, the graph is stable and we can exit.
        EXIT WHEN v_rows_affected_in_iteration = 0;
    END LOOP;

    -- Step 3: Generate the DELETE statements from the completed plan.
    -- The logic here remains the same, as it correctly groups statements by table
    -- and orders them by the now-correct levels.
    -------------------------------------------------------------------
    WITH pk_metadata AS (
        SELECT
            i.indrelid AS table_oid,
            array_agg(a.attname::text ORDER BY u.ord) as pk_names,
            array_agg(t.typname::text ORDER BY u.ord) as pk_types
        FROM pg_index i
        JOIN unnest(i.indkey) WITH ORDINALITY AS u(attnum, ord) ON TRUE
        JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = u.attnum
        JOIN pg_type t ON t.oid = a.atttypid
        WHERE i.indisprimary AND i.indrelid IN (SELECT DISTINCT table_oid FROM delete_plan)
        GROUP BY i.indrelid
    )
    SELECT array_agg(s.stmt ORDER BY s.level DESC, s.stmt)
    INTO v_statements
    FROM (
        SELECT
            p.level,
            format(
                'DELETE FROM %I.%I WHERE %s;',
                ns.nspname, cls.relname,
                CASE
                    WHEN array_length(pk.pk_names, 1) = 1 THEN
                        format('%I IN (%s)',
                            pk.pk_names[1],
                            string_agg(format('%L::%s', p.pk_values ->> pk.pk_names[1], pk.pk_types[1]), ', ')
                        )
                    ELSE
                        format('(%s) IN (VALUES %s)',
                            (SELECT string_agg(quote_ident(c), ', ') FROM unnest(pk.pk_names) AS c),
                            string_agg('(' || (SELECT string_agg(format('%L::%s', p.pk_values ->> pk.pk_names[i], pk.pk_types[i]), ', ') FROM generate_series(1, array_length(pk.pk_names, 1)) i) || ')', ', ')
                        )
                END
            ) AS stmt
        FROM delete_plan p
        JOIN pg_class cls ON cls.oid = p.table_oid
        JOIN pg_namespace ns ON ns.oid = cls.relnamespace
        JOIN pk_metadata pk ON pk.table_oid = p.table_oid
        GROUP BY p.level, ns.nspname, cls.relname, pk.pk_names, pk.pk_types
    ) s;

    RETURN COALESCE(v_statements, '{}');
END;
$$ LANGUAGE plpgsql;

以下のように使用すると、

SELECT unnest(generate_recursive_delete_statements(
    'public',
    'users',
    '{"id": 1}'::jsonb
));

以下のように再帰的に必要となる DELETE 文が出力される。

unnest                                                                 |
-----------------------------------------------------------------------+
DELETE FROM public.comments WHERE "id" IN ('100'::int8, '101'::int8);  |
DELETE FROM public.posts WHERE "id" IN ('10'::int8, '11'::int8);       |
DELETE FROM public.users WHERE id IN ('1'::int8);                      |

起点となるレコードを複数指定する場合は、以下の関数を使うことができる。

CREATE OR REPLACE FUNCTION generate_recursive_delete_statements_many(
    p_schema_name TEXT,
    p_table_name TEXT,
    p_pk_values_array JSONB
)
RETURNS TEXT[] AS $$
/**
 * @description Accepts an array of primary key values and generates a comprehensive list of DELETE
 * statements for all specified rows and their dependents. It acts as a wrapper around the
 * single-row `generate_recursive_delete_statements` function.
 *
 * @param p_schema_name TEXT - The schema name of the starting table.
 * @param p_table_name TEXT - The table name of the starting table.
 * @param p_pk_values_array JSONB - A JSON array of primary key objects.
 * e.g., '[{"id": 123}, {"id": 124}]'
 *
 * @returns TEXT[] - A combined and ordered array of DELETE statements for all provided primary keys.
 */
DECLARE
    v_all_statements TEXT[] := '{}';
    v_pk_value JSONB;
    v_statements_for_one TEXT[];
BEGIN
    -- Validate that the input is a JSON array
    IF jsonb_typeof(p_pk_values_array) != 'array' THEN
        RAISE EXCEPTION 'p_pk_values_array must be a JSON array. e.g., ''[{"id": 123}, {"id": 124}]''.';
    END IF;

    -- Loop through each primary key object in the input array
    FOR v_pk_value IN SELECT * FROM jsonb_array_elements(p_pk_values_array)
    LOOP
        -- Call the original function for a single primary key
        SELECT generate_recursive_delete_statements(p_schema_name, p_table_name, v_pk_value)
        INTO v_statements_for_one;

        -- Concatenate the returned statements into the main array
        v_all_statements := v_all_statements || v_statements_for_one;
    END LOOP;

    -- Note: This simple concatenation may result in duplicate DELETE statements if multiple
    -- starting rows share common dependents. This is generally harmless as subsequent
    -- DELETEs for the same row will affect 0 rows. The critical deletion order is preserved
    -- for each dependency chain.

    RETURN v_all_statements;
END;
$$ LANGUAGE plpgsql;

以下のように対象レコードのプライマリキーを配列指定して渡せばよい。

SELECT unnest(generate_recursive_delete_statements_many(
    'public',
    'users',
    '[{"id": 1}, {"id": 2}, {"id": 3}]'::jsonb
));