手动查询传播

当用户发出查询时,Citus 协调器将其划分为更小的查询片段 其中每个查询片段可以在工作分片上独立运行。 这允许 Citus 将每个查询分布在集群中。

但是,将查询划分为片段的方式(以及传播哪些查询)因查询类型而异。 在某些高级情况下,手动控制此行为很有用。 Citus 提供实用函数来将 SQL 传播到 worker、分片或共置放置(colocated placements)。

手动查询传播绕过 coordinator 逻辑、锁定(locking)和任何其他一致性检查。 这些函数是允许 Citus 原生无法运行的语句的最后手段。仔细使用它们以避免数据不一致和死锁。

在所有 Worker 上运行

最小的执行级别是广播一条语句以在所有 worker 上执行。 这对于查看整个 worker 数据库的属性很有用。

-- List the work_mem setting of each worker database
SELECT run_command_on_workers($cmd$ SHOW work_mem; $cmd$);

Note

不应使用此命令在 worker 上创建数据库对象,因为这样做会使以自动方式添加 worker 节点变得更加困难。

Note

本节中的 run_command_on_workers 函数和其他手动传播命令只能运行返回单列单行的查询。

在所有分片上运行

下一个粒度级别是跨特定分布式表的所有分片运行命令。 它可以很有用,例如,直接在 worker 上读取表的属性。 在 worker 节点本地运行的查询可以完全访问元数据,比如表统计信息。

run_command_on_shards 函数将 SQL 命令应用于每个分片,其中提供分片名称以在命令中进行插值。 这是一个估计分布式表行数的示例,通过使用每个工作人员上的 pg_class 表来估计每个分片的行数。请注意将替换为每个分片名称的 %s

-- Get the estimated row count for a distributed table by summing the
-- estimated counts of rows for each shard.
SELECT sum(result::bigint) AS estimated_count
  FROM run_command_on_shards(
    'my_distributed_table',
    $cmd$
      SELECT reltuples
        FROM pg_class c
        JOIN pg_catalog.pg_namespace n on n.oid=c.relnamespace
       WHERE (n.nspname || '.' || relname)::regclass = '%s'::regclass
         AND n.nspname NOT IN ('citus', 'pg_toast', 'pg_catalog')
    $cmd$
  );

run_command_on_shards 的一个有用伴侣是 run_command_on_colocated_placements。 它将 co-located 的分布式表的 个位置的名称插入到查询中。 放置对总是被选择为本地的同一个 worker,其中完整的 SQL 覆盖是可用的。 因此,我们可以使用触发器等高级 SQL 功能来关联表:

-- Suppose we have two distributed tables
CREATE TABLE little_vals (key int, val int);
CREATE TABLE big_vals    (key int, val int);
SELECT create_distributed_table('little_vals', 'key');
SELECT create_distributed_table('big_vals',    'key');

-- We want to synchronize them so that every time little_vals
-- are created, big_vals appear with double the value
--
-- First we make a trigger function, which will
-- take the destination table placement as an argument
CREATE OR REPLACE FUNCTION embiggen() RETURNS TRIGGER AS $$
  BEGIN
    IF (TG_OP = 'INSERT') THEN
      EXECUTE format(
        'INSERT INTO %s (key, val) SELECT ($1).key, ($1).val*2;',
        TG_ARGV[0]
      ) USING NEW;
    END IF;
    RETURN NULL;
  END;
$$ LANGUAGE plpgsql;

-- Next we relate the co-located tables by the trigger function
-- on each co-located placement
SELECT run_command_on_colocated_placements(
  'little_vals',
  'big_vals',
  $cmd$
    CREATE TRIGGER after_insert AFTER INSERT ON %s
      FOR EACH ROW EXECUTE PROCEDURE embiggen(%L)
  $cmd$
);

限制

  • 多语句事务没有防止死锁的安全措施。

  • 没有针对中间查询失败和由此产生的不一致性的安全防护措施。

  • 查询结果缓存在内存中;这些函数无法处理非常大的结果集。

  • 如果函数无法连接到节点,则会提前出错。

  • 你可以做很坏的事情!