Návrh a realizace UDF v c pro PostgreSQL

Z PostgreSQL
Přejít na: navigace, hledání

Pozor - příklady v novějších verzích PostgreSQL (8.3 a novější) nelze použít bez úpravy.

Návrh funkcí

Rozšiřování PostgreSQL pomocí C funkcí není příliš komplikované (je srovnatelné s návrhem aplikací pro jiné toolkity), přesto je složitější než návrh funkce v PL/pgSQL nebo v jiném z podporovaných jazyků. Minimálně se komplikuje překlad a testování funkcí. Prakticky nepřenositelný kód spolu s vyšší pracností je zřejmě důvodem skutečnosti, že se s knihovnami napsanými v C se příliš nesetkáte. Na druhou stranu sve funkce (User Defined Functions) v C navrhneme mnohem rychlejší a úspornější. Kromě vlastních funkcí máme v PostgreSQL možnost navrhovat vlastní datové typy. Použitím vlastního datového typu se prakticky vzdáme přenositelnosti své aplikace na jiné RDBMS, na oplátku naše aplikace může být opět o něco efektivnější a čitelnější (bohužel pouze pro Vás).

Jakákoliv funkce kterou chceme použít v PostgreSQL musí vycházet z následujícího schématu.

PG_FUNCTION_INFO_V1 (jméno_funkce);

Datum jméno_funkce (PG_FUNCTION_ARGS)
{
  BpChar *arg1 = PG_GETARG_BPCHAR_P (0);

  PG_RETURN_BOOL (false);
}

Jedná se o tzv. první verzi volající konvence. Jelikož se tzv. nultou variantu nedoporučuje používat, nebudu se o ní více zmiňovat než, že se jedná o klasickou definici funkce v C.

Paremetry funkce můžeme předávat odkazem nebo hodnotou. Makra pro předání hodnoty odkazem končí _P. Číslo označuje pořadí získaného parametru. Hodnotou můžeme předávat pouze typy fixní velikosti jejichž velikost není větší než 4 byty (resp. sizeof(Datum)). Datum nemá nic společného s datumy, je to tzv univerzální datový typ.

Pro úplnost uvádím rozvinutý tvar maker s příkladu.

extern Pg_finfo_record * pg_finfo_check_rc (void);
Pg_finfo_record * pg_finfo_check_rc (void) 
{
  static Pg_finfo_record my_finfo = { 1 };
  return &my_finfo; 
}

Datum jméno_funkce (FunctionCallInfo fcinfo)
{
  BpChar *arg1 = ((BpChar *) pg_detoast_datum ((struct varlena *) 
	 ((Pointer) ((fcinfo->arg[0]))));

  return ((Datum) (((bool) 0)) ? 1 : 0));
}

Pokud funkci při deklaraci v PostgreSQL neoznačíme jako isStrict, musíme každý parametr otestovat před použitím na hodnotu NULL (a případně vrátit hodnotu NULL).

IF (PG_ARGISNULL (0)) PG_RETURN_NULL ();

Po přeložení a zkopírování knihovny do lib adresáře PostgreSQL (obvykle /usr/local/pgsql/lib), můsíme ještě funkci vytvořit v PostgreSQL.

CREATE OR REPLACE FUNCTION jméno_funkce (char) RETURNS bool
  AS 'knihovna.so', 'jméno_funkce' LANGUAGE 'C';

Návrh funkcí komplikuje fakt, že PostgreSQL nepoužívá (více-méně) klasický nulou ukončený řetězec, ale řetězec s pevnou velikostí, tj. první 4 byte řetězce nese velikost řetězce (včetně hlavičky). Naštěstí máme pro operace s řetězci připraveno několik maker (můžeme je použít pro typy BpChar*, VarChar* a text* - odpovídají PostgreSQL typům char, varchar, text). Makra jsou postavena nad strukturou varlena (obsahuje dvě pole va_header a va_content).

// ziskani velikosti a vlastniho retezce

BpChar *arg1 = PG_GETARG_BPCHAR_P (0);
long len = VARSIZE (arg1) - VARHDRSZ;
char *sf = VARDATA (arg1); // pozor nejedna se o sz

// vytvoření řetězce a jeho naplnění jedním znakem

BpChar *result = palloc (VARHDRSZ + 1);
VARATT_SIZEP (result) = VARHDRSZ + 1;
*(VARDATA (result)) = 'M';

K alokování paměti se použíjeme funkci palloc místo obvyklého malloc. Pamět alokovaná touto funkcí (palloc) je automaticky uvolněna po ukončení transakce.

Typ DateADT, který používám níže, se v PostgreSQL používá k uchování datumu. PostgreSQL používá pro uložení datumu celé dlouhé číslo se znaménkem, interpretovatelné jako počet dní od 1.1.2000. Jednoduše lze toto číslo (pro konkrétní datum) získat převodem do juliánovského kalendáře.

// převod 10.2.1973 do DateADT

DateADT d = date2j (1973, 2, 10) - date2j (2000, 1, 1);

// získání roku, měsíce a dne z DateADT

int rok, mesic, den; DateADT d = -9821;
j2date (d + date2j (2000, 1, 1), &rok, &mesic, &den);

Funkce date2j, j2date naleznete v src/backend/utils/adt/datetime.c v zdrojových souborech PostgreSQL. Při převodu se nekontroluje formát datumu, tj bez chyby se převede i 32.13.2002.

Vlastní implementace funkcí dateserial a timeserial

Začneme s jednoduššími funkcemi. PostgreSQL standardně neobsahuje funkce dateserial a timeserial, které známe z Visual Basicu, případně VBA. Bez těchto funkcí se sice obejdeme, musíme ale provést relativně složité a tudíž i pomalé přeformátování trojice čísel na řetězec ve formátu, který je pro PostgreSQL čitelný.

Se základními znalostmi dokážeme vytvořit následující funkce (nultá varianta - bez kontrol)

#include "postgres.h"
#include "fmgr.h"
#include "utils/date.h"
#include "utils/nabstime.h"

PG_FUNCTION_INFO_V1 (dateserial);
PG_FUNCTION_INFO_V1 (timeserial);

Datum dateserial (PG_FUNCTION_ARGS)
{
  int32 year = PG_GETARG_INT32(0);
  int32 month = PG_GETARG_INT32(1);
  int32 day = PG_GETARG_INT32(2);

  DateADT d = date2j (year, month, day) - POSTGRES_EPOCH_JDATE;
  PG_RETURN_DATEADT(d);
}

Datum timeserial (PG_FUNCTION_ARGS)
{
  int32 hour = PG_GETARG_INT32(0);
  int32 minutes = PG_GETARG_INT32(1);
  int32 seconds = PG_GETARG_INT32(2);

  TimeADT result = (((hour * 60) + minutes) * 60) + seconds;  
  PG_RETURN_TIMEADT(result);
}

Jelikož tyto funkce nemají žádnou kontrolu parametrů,lze je jen těžko reálně používat. Pokud však zadáme parametry ze správných intervalů, pak tyto funkce pracují správně. Na těchto funkcích není nic komplikovaného.

Správné nastavení parametrů překladu nám zajistí Makefile v následující podobě

PG_INCLUDE=`pg_config --includedir-server`
PG_LIBDIR=`pg_config --libdir`
PG_PACKAGE=`pg_config --pkglibdir`

all: datefunc

datefunc:
	gcc -ggdb -fpic -I$(PG_INCLUDE) datefunc.c -c -o datefunc.o
	gcc -ggdb -shared -o datefunc.so datefunc.o
	cp datefunc.so $(PG_PACKAGE)/datefunc.so

Po doplnění o nutné kontroly můžeme tyto funkce zpřístupnit i ostatním uživatelům. Kód kontrol vychází z kódu modulu datetime.c a přebírá některé proměnné, funkce a makra. Kód byl ještě doplněn o funkce pro převod datumů z utc formátu a MSSQL formátu do PostgreSQL timestamp typu. Tyto funkce použijeme zejména v okamžiku, kdy musíme bezpečně přenést datumové položky mezi aplikačními vrstvami např. javascript->asp-> PostgreSQL. Zejména použití asp si vynucuje použití těchto funkcí, jelikož konverze string datum je závislá na nastavení národního prostředí serveru

PG_FUNCTION_INFO_V1 (dateserial);
PG_FUNCTION_INFO_V1 (timeserial);
PG_FUNCTION_INFO_V1 (utc_timestamp);
PG_FUNCTION_INFO_V1 (mssql_timestamp);

Datum dateserial (PG_FUNCTION_ARGS)
{
  int32 year = PG_GETARG_INT32(0);
  int32 month = PG_GETARG_INT32(1);
  int32 day = PG_GETARG_INT32(2);

  if (year >= 0 && year < 73) 
    year += 2000;
  else if (year >= 0 && year < 100)
    year += 1900;

  if (month > 12 || day > 31)
    {
      ereport(ERROR, 
	      (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
	       errmsg("month or day out of range")));
    }
  else if (day > day_tab[isleap(year)][month - 1]) 
    {
      ereport(ERROR, 
	      (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
	       errmsg("day out of range")));
    }
  else if (IS_VALID_UTIME(year, month, day)) 
    {
      DateADT d = date2j (year, month, day) - POSTGRES_EPOCH_JDATE;
      PG_RETURN_DATEADT(d);
    } 
  else
    {
      ereport(ERROR, 
	      (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
	       errmsg("day out of range")));
    }
}

Datum timeserial (PG_FUNCTION_ARGS)
{
  int32 hour = PG_GETARG_INT32(0);
  int32 minutes = PG_GETARG_INT32(1);
  int32 seconds = PG_GETARG_INT32(2);

  if (hour < 0 || hour > 59 || minutes < 0 || minutes > 59 
      || seconds < 0 || seconds > 59)
      ereport(ERROR, 
	      (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
	       errmsg("time out of range")));
  else 
    {
      TimeADT result = (((hour * 60) + minutes) * 60) + seconds;  
      PG_RETURN_TIMEADT(result);
    }
}

Datum utc_timestamp (PG_FUNCTION_ARGS)
{
  int64 utc = PG_GETARG_INT64(0);

  Timestamp result = utc /1000 - 
    ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * 86400);

  PG_RETURN_TIMESTAMP(result);
}

Datum mssql_timestamp (PG_FUNCTION_ARGS)
{
  float mstime = PG_GETARG_FLOAT4(0);

  Timestamp result = ((mstime - 25567.0) * 86400.0) - 
    ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * 86400);

  PG_RETURN_TIMESTAMP(result);
}

Registrační SQL soubor pro výše uvedené funkce může mít tuto podobu

 CREATE FUNCTION mssql_timestamp(float4) RETURNS timestamp 
     AS 'datefunc.so', 'mssql_timestamp' LANGUAGE 'C' STRICT;

   CREATE FUNCTION utc_timestamp(bigint) RETURNS timestamp 
     AS 'datefunc.so', 'utc_timestamp' LANGUAGE 'C' STRICT;

   CREATE FUNCTION timeserial(int, int, int) RETURNS time 
     AS 'datefunc.so', 'timeserial' LANGUAGE 'C' STRICT;

   CREATE FUNCTION dateserial(int, int, int) RETURNS date 
     AS 'datefunc.so', 'dateserial' LANGUAGE 'C' STRICT;

   GRANT EXECUTE ON FUNCTION dateserial(int, int, int) TO PUBLIC;
   GRANT EXECUTE ON FUNCTION timeserial(int, int, int) TO PUBLIC;
   GRANT EXECUTE ON FUNCTION utc_timestamp(bigint) TO PUBLIC;
   GRANT EXECUTE ON FUNCTION mssql_timestamp(float) TO
   PUBLIC;

Kontrola rodného čísla

Následující příklad obsahuje funkce pro kontrolu rodného čísla (test na nulové modulo 11), získání data narození z rodného čísla a získání pohlaví.

Začneme funkcí pro parsování rodného čísla. Je možné zadat rodné číslo s nebo bez znaku lomítka oddělující datum narození od indexu. Funkce vrací nulu nebo číslo chyby. Funkce toleruje za rodným číslem max. 90 mezer (abych mohl funkci použít i pro typ CHAR(n)).

char *chyba [] = {
  "Rodné číslo je příliš dlouhé",
  "Symbol / je na špatné pozici",
  "Rodné číslo obsahuje nepřípustný znak",
  "Rodné číslo má chybný formát",
  "Rodné číslo vytvořené před rokem 1954 může obsahovat pouze devět číslic",
  "Modulo 11 rodného čísla není nula",
  "Rodné číslo může obsahovat pouze 9 nebo deset číslic"};

#define MAXRCLEN     100

int parsen_rodne_cislo (char *rc, int parts[3], long length)
{
  int i = 0, s2 = 0; char c; 
  int64 s = 0;
  
  while (length--)
    {
      c = *rc++;
      if (c == ' ')
        break;
      if (i > 10)
        return 1;
      if (c == '/')
        if (i == 6)
	  continue;
	else
	  return 2;
      if (c <'0' || c > '9')
        return 3;
      s  = (s  * 10) + (c - '0');
      s2 = (s2 * 10) + (c - '0');
      i++;
      
      if (i == 2) {parts[0] = s2; s2 = 0;}
      if (i == 4) {parts[1] = s2; s2 = 0;}
      if (i == 6) {parts[2] = s2; s2 = 0;}           
    }
    
  while (length-- > 0)
    {
      if (i > MAXRCLEN)
        return 1;
      if (*rc++ != ' ')
        return 4;
      i++;
    }
  if (i == 9)
    if (parts [0] > 53)
      return 5;
    else
       parts [0] += 1900;
  else
    if (i == 10)
      {
        if (parts [0] > 53)
          parts [0] += 1900;
        else
          parts [0] += 2000;
        if ((s % 11) != 0)
	  return 6;
      }
    else
      return 7;

  parts[3] = s2;
  return 0;
}

U rodného čísla platí následující pravidla:

  • rodná čísla vydaná před rokem 1954 mají pouze třímístný index a nelze je testovat na modulo 11.
  • rodná čísla vydaná po roce 1954 včetně mají čtyřmístný index a lze je testovat na modulo 11.
  • století, kdy bylo rodné číslo vydáno se pozná podle počtu číslic v indexu. Pokud mám počáteční číslice např. 19 a trojmístný index, pak jde o rok 1919, pokud bych měl čtyřmístný index, pak se jedná o rok 2019.

Nyní se dostávám k samotným funkcím, které budou volatelné s PostgreSQL. Musí být proto nadeklarovaná dle v1 konvence.

PG_FUNCTION_INFO_V1 (check_rc);
PG_FUNCTION_INFO_V1 (birth_date);
PG_FUNCTION_INFO_V1 (sex);

Datum
check_rc (PG_FUNCTION_ARGS)
{
  int parts [] = {0, 0, 0, 0}, result;
  BpChar *rc = PG_GETARG_BPCHAR_P (0);
  result = parsen_rodne_cislo (VARDATA (rc), parts, VARSIZE (rc) - VARHDRSZ);

  if (result == 0)
    PG_RETURN_BOOL (true);
  else
    {
      elog (WARNING, chyba [result - 1]);
      PG_RETURN_BOOL (false);
    }
}

Datum
birth_date (PG_FUNCTION_ARGS)
{
  int parts [] = {0, 0, 0, 0}, result; DateADT d;
  BpChar *rc = PG_GETARG_BPCHAR_P (0);
  result = parsen_rodne_cislo (VARDATA (rc), parts, VARSIZE (rc) - VARHDRSZ);

  if (result != 0)
    elog (ERROR, chyba [result - 1]);

  if (parts[1] > 50)
    parts [1] -= 50;

  d = date2j (parts[0],parts[1], parts[2]) - date2j (2000, 1, 1);  
  PG_RETURN_DATEADT (d);
}

Datum
sex (PG_FUNCTION_ARGS)
{
  int parts [] = {0, 0, 0, 0}, result; char s;
  BpChar *rc = PG_GETARG_BPCHAR_P (0);
  result = parsen_rodne_cislo (VARDATA (rc), parts, VARSIZE (rc) - VARHDRSZ);
  BpChar *res;

  if (result != 0)
    elog (ERROR, chyba [result - 1]);

  if (parts[1] > 50) s = 'F'; else s = 'M';

  res = palloc (VARHDRSZ + 1);
  VARATT_SIZEP (res) = VARHDRSZ + 1;
  *(VARDATA (res)) = s;

  PG_RETURN_BPCHAR_P (res);
}

Funkce elog slouží k zobrazení na uživatelskou konzolu. Při úrovni ERROR dojde k přerušení vykonávání funkce. Při úrovni WARNING pouze k vypsání hlášení. Používá se stejně jako v PL/pgSQL konstrukce RAISE.

SQL příkazy, které nám založí funkce do PostgreSQL jsou následující: Provést tyto příkazy může pouze uživatel postgres.

CREATE OR REPLACE FUNCTION check_rc (char) RETURNS bool
  AS 'rc.so', 'check_rc' LANGUAGE 'C' STRICT;

CREATE OR REPLACE FUNCTION birth_date (char) RETURNS date
  AS 'rc.so', 'birth_date' LANGUAGE 'C' STRICT;

CREATE FUNCTION sex (char) RETURNS char
  AS 'rc.so', 'sex' LANGUAGE 'C' STRICT;

Použil jsem jednoduchý Makefile (předpokládám, že zdrojové soubory k PostgreSQL jsou v /usr/src/postgresql-7.3b1/src/include.

PG_INCLUDE = '/usr/src/postgresql-7.3b1/src/include'
PG_LIBDIR = `pg_config --libdir`
PG_PACKAGE = `pg_config --pkglibdir`

all:
	gcc -ggdb -fpic -I$(PG_INCLUDE) rodne_cislo_fce.c -c -o rc.o
	gcc -ggdb -shared -o rc.so rc.o
	cp rc.so $(PG_PACKAGE)/rc.so

Pokud se vše podaří, můžete si vytvořené funkce vyzkoušet.

// rodné číslo je vymyšlené
SELECT check_rc('806115/0262');
SELECT birth_date('806115/0262');
SELECT sex('806115/0262');

Ladění

Poznámka k ladění. Vývoj funkcí v C má i své příjemnější stránky. Jednou z nich je možnost krokovaní při ladění funkce (díky za nakopnutí v konferenci). K ladění můžete použít obyčejný gdb debugger nebo některého z jeho komfortnějších následovníků (např. ddd).Je třeba si uvědomit, že se funkce vykonává na servru, který běží pod jiným účtem než klient. Ladit aplikaci můžeme pouze pod uživatelem, který je vlastníkem laděného procesu. V tomto případě se jedná o uživatele postgres. Celý postup by měl být zřejmý z následujících příkazů.

  • spustím si nového klienta psql a nechám provést alespoň jednou
  • testovanou funkci, aby se natáhla knihovna
[pavel@localhost]$su postgres
[pavel@localhost]$ps -ax | grep postgres
1940 ? S  0:00 postgres: pavel testdb011 [local] idle

[pavel@localhost]$gdb
(gdb)attach 1940
(gdb)break check_rc
Breakpoint 1 at 0x404040fa: file rc.c line 50.
# v psql spustíme provádění funkce
(gdb)list 50
47
48 Datum
49 check_rc (PG_FUNCTION_ARGS)
50 {
51   int parts [3], result;
52   BpChar *rc = PG_GETARG_BPCHAR (0);
        ...

(gdb)step
51   int parts [3], result;
(gdb)step
52   BpChar *rc = PG_GETARG_BPCHAR (0);
(gdb)...
(gdb)continue
(gdb)detach
Detaching from program /usr/local/pgsql/bin/postgres, process 1940
(gdb)quit
[postgresql@localhost]$

Návrh vlastních datových typů

Z minulé kapitoly máme k dispozici funkce pro určení validity, datumu narozeí a pohlaví z rodného čísla. Zatím jsem předpokládal, že rodné číslo ukládám jako CHAR(11). Zřejmou nevýhodou je, že vždy když chci určit některý atribut rodného čísla dochází k parsování řetězce. Navíc z důvodu efektivnosti nemohu použít rodné číslo jako index a ani nemohu pořádně třídit podle něj, jelikož mohu, ale nemusím v rodném čísle použít znak lomítko.

Tyto problémy můžeme vyřešit vytvořením vlastního typu (zvýší se ale režie při zobrazení hodnoty). Samotné vytvoření typu spočívá v:

  • definice typ
  • vytvoření konverznich funkcí mezi naším typem a typem cstring
  • vytvoření funkcí pro operátory
  • registrace typu a registrace operátorů

Typ rodné číslo se skládá z datumu narození a indexu. Jelikož index je vždy menší nežli 32767 mohu nejvyšší bit použít pro určení pohlaví.

typedef struct rodne_cislo {
  DateADT narozen;
  int16   index;
} rodne_cislo;

Velikost typu je 8 bytů, tudíž všem funkcím bude předáván odkazem. Dodefinuji si makra (jde jen o přetypování ukazatele).

#define DatumGetRodneCisloP(X)    ((rodne_cislo*) DatumGetPointer(X))
#define RodneCisloPGetDatum(X)    PointerGetDatum(X)
#define PG_GETARG_RC_P(n)         DatumGetRodneCisloP(PG_GETARG_DATUM(n))
#define PG_RETURN_RC_P(X)         return RodneCisloPGetDatum(X)

Základem jsou konverzní funkce z a do typu cstring. PostgreSQL je automaticky použije při zobrazení a zadávání konkrétní hodnoty. Doporučuje se tyto funkce pojmenovat typ_out a typ_in. V příkladu je použita funkce parsen_rodné_číslo z předchozího dílu

PG_FUNCTION_INFO_V1 (rodne_cislo_in);
PG_FUNCTION_INFO_V1 (rodne_cislo_out);

Datum
rodne_cislo_in (PG_FUNCTION_ARGS)
{
  int parts [] = {0, 0, 0, 0}, result, pohlavi;
  char *rcstr; rodne_cislo *rc;

  if (PG_ARGISNULL (0)) 
    PG_RETURN_NULL ();
  
  rcstr = PG_GETARG_CSTRING (0);
  
  result = parsen_rodne_cislo (rcstr, parts, strlen (rcstr));
  if (result != 0)
    elog (ERROR, chyba [result - 1]);

  rc = (rodne_cislo*) palloc (sizeof (rodne_cislo));

  if (parts [1] > 50)
    {
      pohlavi = 0;
      parts [1] -= 50;
    }
  else
    pohlavi = 1;  

  rc -> narozen = date2j (parts [0], parts [1], parts [2]) -
    date2j (2000, 1, 1);
  rc -> index = (parts [3] | pohlavi <<15);

  PG_RETURN_RC_P (rc);
}

char *sts_ito2d (int i, char *temp);
  
Datum
rodne_cislo_out (PG_FUNCTION_ARGS)
{
  int r, m, d, r2, length;
  char temp [10], *rcstr;
  rodne_cislo *rc;

  if (PG_ARGISNULL (0)) 
    PG_RETURN_NULL ();

  rc = PG_GETARG_RC_P (0);

  j2date (rc -> narozen + date2j (2000, 1, 1), &r, &m, &d);

  if ((rc -> index & 32768) == 0) m += 50;

  if      (r > 2000) r2 = r - 2000;
  else if (r > 1900) r2 = r - 1900;
  else if (r > 1800) r2 = r - 1800;

  rcstr = (char*) palloc (12); rcstr [0] = '\0';

  strcat (rcstr, sts_ito2d (r2, temp));
  strcat (rcstr, sts_ito2d (m,  temp));
  strcat (rcstr, sts_ito2d (d,  temp));

  sprintf (temp, "%d", rc -> index & 32767);
  length = strlen (temp);

  if (r <= 1953)
    {
      strcat (rcstr, "/000");
      strcpy (&rcstr [10 - l], temp);
    }
  else
    {
      strcat (rcstr, "/0000");
      strcpy (&rcstr [11 - l], temp);
    }
  
  PG_RETURN_CSTRING (rcstr);
}

char *sts_ito2d (int i, char *buf)
{
  if (i <10)
    sprintf (buf, "0%d", i);
  else
    sprintf (buf, "%d",  i);
  return buf;
}

Nyní již můžeme deklarovat tyto funkce a typ rodne_cislo v PostgreSQL.

DROP TYPE rodne_cislo CASCADE;

CREATE OR REPLACE FUNCTION rodne_cislo_in (cstring) 
  RETURNS rodne_cislo AS 'rc.so','rodne_cislo_in' LANGUAGE 'C';

CREATE OR REPLACE FUNCTION rc_out (rodne_cislo) 
  RETURNS cstring AS 'rc.so', 'rodne_cislo_out' LANGUAGE 'C';

-- poprve se vypise varovani, ze typ rodne_cislo neni

CREATE TYPE rodne_cislo (
  internallength = 8,
  input          = rc_in,
  output         = rc_out
);

Teď už nám nic nebrání navrhnout tabulku osoby a vložit do ní jeden záznam.

CREATE TABLE osoby (
  rc rodne_cislo, 
  jmeno VARCHAR(20), 
  prijmeni VARCHAR(20)
);

INSERT INTO osoby VALUES ('7307150807','Pavel','Stěhule');

Můžeme si napsat funkce, které jsou obdobou funkcí sex a birth_date s předchozího dílu seriálu. Čistější by bylo (alespoň v případě data narození) napsat si vlastní přetypování, tedy datum narození získáme přetypováním rodného čísla do datumu, pohlaví do charu.

Konverze musí být realizovány funkcí, kdy parametrem je hodnota v původním typu a výsledkem hodnota v novém typu.

PG_FUNCTION_INFO_V1 (rodne_cislo_todate);
PG_FUNCTION_INFO_V1 (rodne_cislo_dobpchar);

Datum
rodne_cislo_todate (PG_FUNCTION_ARGS)
{
  rodne_cislo *rc = PG_GETARG_RC_P (0);

  PG_RETURN_DATEADT (rc -> narozen);
}

Datum
rodne_cislo_tobpchar (PG_FUNCTION_ARGS)
{
  rodne_cislo * rc = PG_GETARG_RC_P (0);
  BpChar *result = (BpChar *) palloc (VARHDRSZ + 1);
  VARATT_SIZEP (result) = VARHDRSZ + 1;
  if (rc -> index & 32768)
    *(VARDATA (result)) = 'M';
  else
    *(VARDATA (result)) = 'F';

  PG_RETURN_BPCHAR_P (result);
}

V SQL nadeklarujeme nové funkce a přetypování

CREATE OR REPLACE FUNCTION rodne_cislo_todate (rodne_cislo)
  RETURNS date AS 'rc.so', 'rodne_cislo_todate' 
  LANGUAGE 'C' STRICT IMMUTABLE;

CREATE OR REPLACE FUNCTION rodne_cislo_tobpchar (rodne_cislo)
  RETURNS char AS 'rc.so', 'rodne_cislo_tobpchar' 
  LANGUAGE 'C' STRICT IMMUTABLE;

DROP CAST (rodne_cislo AS date);

CREATE CAST (rodne_cislo AS date) 
  WITH FUNCTION rodne_cislo_todate (rodne_cislo);

DROP CAST (rodne_cislo AS char);

CREATE CAST (rodne_cislo AS char)
  WITH FUNCTION rodne_cislo_tobpchar (rodne_cislo);

Funkce, které chceme použít pro přetypování musí být IMMUTABLE, tj. výsledek funkce, záleží pouze na parametrech funkce. STRICT mi zajišťuje, že pokud by byl parametr NULL, pak je automaticky výsledkem funkce NULL. Odpadá tím nutnost kontrolovat vstupní parametry na NULL.

Přetypování můžeme vyzkoušet příkazem

SELECT CAST ('7307150807'::rodne_cislo AS date);

Tvorba operátorů pro vlastní datové typy

V minulé kapitole jsem předvedl jak mohu v PostgreSQL vytvářet vlastní datové typy. S návrhem typu rodne_číslo jsem skončil zhruba v polovině. Ještě nemůžeme podle rodného čísla vyhledávat, třídit, stejně tak indexovat.

Stále nám ještě chybí alespoň základní operátory =, <>, <, >, <=, >=. Jelikož test na rovnost nebo nerovnost je nejjednodušší, začnu s návrhem operátorů = a <>.

PG_FUNCTION_INFO_V1 (rodne_cislo_eq);
PG_FUNCTION_INFO_V1 (rodne_cislo_ne);

Datum
rodne_cislo_eq (PG_FUNCTION_ARGS)
{
  rodne_cislo *rc1 = PG_GETARG_RC_P (0);
  rodne_cislo *rc2 = PG_GETARG_RC_P (1);

  PG_RETURN_BOOL ((rc1 -> narozen == rc2 -> narozen) 
               && (rc1 -> index   == rc2 -> index));
}

Datum
rodne_cislo_ne (PG_FUNCTION_ARGS)
{
  rodne_cislo *rc1 = PG_GETARG_RC_P (0);
  rodne_cislo *rc2 = PG_GETARG_RC_P (1);

  PG_RETURN_BOOL ((rc1 -> narozen != rc2 -> narozen) 
               || (rc1 -> index   != rc2 -> index));
}

V SQL vytvoříme operátory = a <> následujícími příkazy

CREATE OR REPLACE FUNCTION rodne_cislo_eq (rodne_cislo, rodne_cislo)
  RETURNS bool AS 'rc.so','rodne_cislo_eq' LANGUAGE 'C' STRICT;

CREATE OR REPLACE FUNCTION rodne_cislo_ne (rodne_cislo, rodne_cislo)
  RETURNS bool AS 'rc.so', 'rodne_cislo_ne' LANGUAGE 'C' STRICT;

DROP OPERATOR = (rodne_cislo, rodne_cislo);

CREATE OPERATOR = (
  leftarg   = rodne_cislo,
  rightarg  = rodne_cislo,
  procedure = rodne_cislo_eq
);

DROP OPERATOR <> (rodne_cislo, rodne_cislo);

CREATE OPERATOR <> (
  leftarg   = rodne_cislo,
  rightarg  = rodne_cislo,
  procedure = rodne_cislo_ne
);

Návrh ostatních operátorů je o něco komplikovaněnší. Abych se neupsal navrhnu si funkci rodne_cislo_cmp. Ta vrací -1 pokud je první parametr větší než druhý, 0 pokud jsou stejné a jedničku když je druhý větší než první.

int sts_intcmp (long i1, long i2);

int rodne_cislo_cmp (rodne_cislo *rc1, rodne_cislo *rc2)
{
  int r1, m1, d1, r2, m2, d2, r;
  long d2000 = date2j (2000, 1, 1);

  // pokud jsou stejna pohlavi, staci porovnat narozen a index
  if ((rc1 -> index> 32768) == (rc2 -> index> 32768))
    {
      r = sts_intcmp (rc1 -> narozen, r2 -> narozen);
      if (r != 0) return r;
      return sts_intcmp (rc1 -> index, rc2 -> index);
    }

  j2date(rc1 -> narozen + d2000,&r1,&m1,&d1);  
  j2date(rc2 -> narozen + d2000,&r2,&m2,&d2);  

  r = sts_intcmp (r1,r2); if (r != 0) return r;

  if ((rc1 -> index& 32768) == 0) m1 += 20;
  if ((rc2 -> index& 32768) == 0) m2 += 20;

  r = sts_intcmp (m1,m2); if (r != 0) return r;
  r = sts_intcmp (d1,d2); if (r != 0) return r;

  return sts_intcmp (rc1 -> index, rc2 -> index);
}

int sts_intcmp (long i1, long i2)
{
  if (i1 == i1)
     return 0;
  return (i1 <i2? -1 : 1);
}

C funkce pro porovnávací operátory získáme jednoduše. Jelikož jsou si funkce podobné, nebudu je všechny uvádět.

PG_FUNCTION_INFO_V1 (rodne_cislo_lt);
PG_FUNCTION_INFO_V1 (rodne_cislo_gt);
PG_FUNCTION_INFO_V1 (rodne_cislo_le);
PG_FUNCTION_INFO_V1 (rodne_cislo_ge);

Datum
rodne_cislo_lt (PG_FUNCTION_ARGS)
{
  rodne_cislo *rc1 = PG_GETARG_RC_P (0);
  rodne_cislo *rc2 = PG_GETARG_RC_P (1);

  PG_RETURN_BOOL (rodne_cislo_cmp (rc1, rc2) <0);
}

Datum
rodne_cislo_le (PG_FUNCTION_ARGS)
{
  rodne_cislo *rc1 = PG_GETARG_RC_P (0);
  rodne_cislo *rc2 = PG_GETARG_RC_P (1);

  PG_RETURN_BOOL (rodne_cislo_cmp (rc1, rc2) <= 0);
}

Deklarace v SQL

CREATE OR REPLACE FUNCTION rodne_cislo_lt (rodne_cislo, rodne_cislo)
  RETURNS bool AS 'rc.so','rodne_cislo_lt' LANGUAGE 'C' STRICT;

CREATE OR REPLACE FUNCTION rodne_cislo_le (rodne_cislo, rodne_cislo)
  RETURNS bool AS 'rc.so', 'rodne_cislo_le' LANGUAGE 'C' STRICT;

DROP OPERATOR <(rodne_cislo, rodne_cislo);

CREATE OPERATOR <(
  leftarg   = rodne_cislo,
  rightarg  = rodne_cislo,
  procedure = rodne_cislo_le
);

DROP OPERATOR <= (rodne_cislo, rodne_cislo);

CREATE OPERATOR <= (
  leftarg   = rodne_cislo,
  rightarg  = rodne_cislo,
  procedure = rodne_cislo_le
);

U každého operátoru můžeme doplnit další atributy (pokud existují), které mohou mohou urychlit provádění dotazu (optimalizací dotazu) ve kterém se vyskytuje daný operátor.

  • Operace A je komutátorem operace B pokud pro libovolná x a y platí: (x A y) = (y B x). Např. = je komutátorem sama sebe, stejně tak <>. Pro <je komutátor > a naopak. + je komutátorem sama sebe. - nebo / komutátory nemají.
  • Operace A je negátorem operace B pokud pro libovolná x a y platí: (x A y) = NOT (x B y). Výsledkem obou operací musí být pouze logická hodnota. Negátorem = je <> a naopak, negátorem <je >= a naopak
  • klauzule RESTRICT popisuje chování operátoru pří jeho použití vůči konstantě. Máme na výběr několik možností
eqsel pro = výsledkem je malá podmnožina tabulky
neqsel pro <> výsledkem je skoro celá tabulka
scalarltsel pro < nebo <= výsledek záleží na pozici konstanty v tabulce, berou se menší hodnoty
scalargtsel pro > nebo >= totéž ale v opačném směru.
  • klauzule JOIN přiřazuje funkci odhadu selektivity operace. Smysl je podobný jako u RESTRICT. K dispozici jsou funkce eqjoinsel, neqjoinsel, scalarltjoinsel a scalargtjoinsel.

Po doplnění výše zmíněných klauzulé bude vypadat definice operátoru porovnání rodných čísel následovně:

DROP OPERATOR = (rodne_cislo, rodne_cislo);

CREATE OPERATOR = (
  leftarg    = rodne_cislo,
  rightarg   = rodne_cislo,
  procedure  = rodne_cislo_eq,
  commutator = =,
  negator    = <>,
  restrict   = eqsel,
  join       = eqjoinsel   
);

DROP OPERATOR <> (rodne_cislo, rodne_cislo);

CREATE OPERATOR <> (
  leftarg   = rodne_cislo,
  rightarg  = rodne_cislo,
  procedure = rodne_cislo_ne,
  commutator = <>,
  negator    = =,
  restrict   = neqsel,
  join       = neqjoinsel
);

Pokud zkusíte indexovat dle sloupce rc v tabulce osoby, pravděpodobně obdržíte chybovou hlášku o tom, že pro daný datový typ není určena třída operátoru pro přístup do "btree".

To proto, že v PostgreSQL musí mít každý typ určen jednu (nejčastější případ) nebo několik tzv. tříd operátorů (např. pro komplexní čísla mohu budovat index pro absolutní velikost čísla, pro reálnou nebo imaginární část). Třída operátorů určuje, které operátory se budou používat pro indexování a jaký mají sémantický význam, tzv Strategy Number.

Pro B-tree jsou určená tato Strategy Number

Číslo strategie
Menší než 1
Menší nebo roven 2
Roven 3
Větší nebo roven 4
Větší než 5

PostgreSQL umožňuje používat několik indexovacích metod (R-tree, Hash, GiST). Každá metoda má i vlastní tabulku Strategy Numbers. Kromě toho je potřeba definovat i tzv. Support rutinu. Pro B-tree index je pouze jedna (Strategy Number je 1). Jejími parametry jsou dva klíče a výsledkem celé číslo. Záporné, pokud je první parametr menší než druhý, nula, pokud jsou si rovny, a kladné číslo, pokud je první parametr větší než druhý.

Základ Support funkce máme. Stačí ji jen zpřístupnit v PostgreSQL

PG_FUNCTION_INFO_V1 (rodne_cislo__cmp);

Datum
rodne_cislo__cmp (PG_FUNCTION_ARGS)
{
  rodne_cislo *rc1 = PG_GETARG_RC_P (0);
  rodne_cislo *rc2 = PG_GETARG_RC_P (1);

  PG_RETURN_INT32 (rodne_cislo_cmp (rc1, rc2));
}

CREATE OR REPLACE FUNCTION rodne_cislo_cmp (rodne_cislo, rodne_cislo)
  RETURNS int AS 'rc.so', 'rodne_cislo__cmp' LANGUAGE 'C' STRICT;

CREATE OPERATOR CLASS rodne_cislo_ops
  DEFAULT FOR TYPE rodne_cislo USING btree AS
    OPERATOR  1	   <>,
    OPERATOR  2	   <=,
    OPERATOR  3	   = ,
    OPERATOR  4	   >=,
    OPERATOR  5	   >,
    FUNCTION  1	   rodne_cislo_cmp (rodne_cislo, rodne_cislo);

V tuto chvíli je již náš typ rodne_cislo plnohodnotný datový typ, který můžeme použít jako primární klíč. Pro praktické použití je třeba poznamenat, že rodná čísla vydaná před rokem 1953 nemusí být jedinečná, jak se zjistilo při kuponové privatizaci. Pokud používáte PostgreSQL v7.3 pak budete muset ještě nastavit příslušná práva ostatním uživatelům (v tomto případě zřejmě PUBLIC) příkazem GRANT.

Přístup k interním funkcím serveru

V předchozích kapitolách jsem navrhovali funkce, jejichž výsledek závisel pouze na parametrech funkce, a které z celého systému PostgreSQL zatím jen datové struktury. Rozhraní SPI (Server Programming Interface), které máme k dispozici, nám však dovoluje mnohem více.

Vzpomeňte si na příklad agregační funkce html_list implementované v PL/pgSQL. Tato funkce vrací seznam položek v sloupci formátovaný jako html seznam. Totožnou funkci bude mít funkce html_list. Jejími parametry bude SQL dotaz, sloupec, jehož položky chceme uložit do seznamu, maximální počet zpracovávaných řádek a velikost bloku alokované paměti.

Pro náš příklad si vystačíme s pěti základními funkcemi: SPI_connect, SPI_finish, SPI_exec, SPI_fnumber a SPI_getvalue. S touto množinou funkcí dokážeme pouze číst výsledek dotazu, nicméně to v tuto chvíli dostačující. Funkce SPI_connect inicializuje rozhraní a musí proto být volána jako první. Naproti tomu SPI_finish uvolňuje veškerou paměť alokovanou během běhu funkce funkcí palloc a musí být proto volána jako poslední. Pokud naše funkce vrací ukazatel na paměť alokovanou v proceduře (u všech hodnot předávaných pdkazem) nesmí se paměť alokovat příkazem palloc. To z toho důvodu, že ještě než stihneme hodnotu předat volající funkci, SPI_finish stihne pamět s touto hodnotou uvolnit. Tento "neřešitelný"problém řeší alternativní procedury pro práci s pamětí: SPI_palloc a SPI_replloc. Volání SPI_finish nemá vliv na paměť alokovanou těmito funkcemi. Pokud dojde k ukončení funkce voláním elog (ERROR, ...) potom je SPI_finish voláno automaticky.

Pokud dojde k chybě, pak SPI funkce vrací obvykle záporné číslo. Po aktivaci rozhraní můžeme funkcí SPI_exec nechat provést SQL příkaz. Druhým parametrem funkce SPI_exec je maximální počet vrácených záznamů. Pokud je nula, pak se použijí všechny záznamy. Po bezchybném provedení dotazu můžeme používat globální proměnnou SPI_processed obsahující počet vrácených záznamů a globální ukazatel SPI_typtable.

Ukazatel SPI_tuptable je ukazatel na strukturu obsahující jednak pole s vrácenými řádkami a odkaz na popis struktury vrácených dat. Pro získání hodnoty ve sloupečku potřebujeme vědět pořadí sloupce v dotazu. Pořadí vrací funkce SPI_fnumber, např.

sloupec = SPI_fnumber(SPI_tuptable->tupdesc, "jmeno");
if (sloupec <= 0)
  elog (ERROR, "SPI_ERROR_NOATTRIBUTE");

Přístup k n tému sloupečku x tého řádku zajišťuje funkce SPI_getvalue. Je nutné si uvědomovat, že všechny hodnoty vrácené SPI_getvalue jsou textové. Získání první hodnoty prvního řádku provedeme příkazem:

printf ("%s", SPI_getvalue(
			SPI_tuptable->vals[0], 
			SPI_tuptable->tupdesc, 1));

Abych urychlil provádění funkce, nealokuji paměť při každém přidání řetězce, ale po blocích - stránkách, jejichž velikost si sami určíme.

text *sts_strcatex2 (char *str, text *txt, 
                     long *zapsano, long *zbyva, long stranka)
{
  long len = strlen (str);
  if (len > stranka)
    elog (ERROR, "Retezec je vetsi nez velikost stranky");
  if (*zbyva <len)
    {
      long nsize = (((*zapsano + len) / stranka) + 1) * stranka;
      txt = (text *) SPI_repalloc (txt, nsize + VARHDRSZ);
      *zbyva = nsize - *zapsano;
    }
  memcpy ((char*)VARDATA (txt) + *zapsano, str, len);
  *zapsano += len; VARATT_SIZEP (txt) = *zapsano + VARHDRSZ;
  *zbyva -= len;
  return (txt);
}

Paměť alokuji voláním funkce SPI_repalloc a nikoliv repalloc z již zmíněného důvodu - pokud bych použil palloc, pak fce. SPI_finish by tuto paměť uvolnila. Následné volání PG_RETURN_TEXT_P by vracelo neplatný pointer, což vede (jak jsem se sám přesvědčil zhruba v 80% k pádu backendu).

Základ funkce html_list je následující:

SPI_connect ();

  if ((ret = SPI_exec (query, max)) <0)
    elog (ERROR, "SPI_exec vrátilo %d", ret);
  
  sloupec = SPI_fnumber(SPI_tuptable->tupdesc, jm_sloupce);

  if (sloupec <= 0)
    elog (ERROR, "SPI_ERROR_NOATTRIBUTE");
  
  zapsano = 0; zbyva = stranka;
  txt = (text*) SPI_palloc (VARHDRSZ + zbyva);

  txt = sts_strcatex2 ("<ul>\n", txt,&zapsano,&zbyva, stranka);

  for (radek = 0; radek <SPI_processed; radek++)
    {
      txt = sts_strcatex2 ("<li>", txt,&zapsano,&zbyva, stranka);
      txt = sts_strcatex2 (SPI_getvalue (SPI_tuptable->vals[radek], 
      					SPI_tuptable->tupdesc, sloupec),
    			  txt,&zapsano,&zbyva, stranka);
      txt = sts_strcatex2 ("</li>\n", txt,&zapsano,&zbyva, stranka);
    }
  txt = sts_strcatex2 ("</ul>", txt,&zapsano,&zbyva, stranka);
  SPI_finish ();
  PG_RETURN_TEXT_P(txt);

K textu není co dodat. V cyklu čtu jednotlivé hodnoty sloupce (SPI_getvalue vrací vždy ukazatel na řetězec), které přidávám do struktury txt typu text.

Po překladu můžeme fukci zaregistrovat v PostgreSQL

CREATE OR REPLACE FUNCTION html_list (cstring, cstring, int, int)
  RETURNS text AS 'html_list.so','html_list' LANGUAGE 'C';

Tabulka, kterou jsem pro test použil obsahuje 192 záznamu, výsledkem je text o délce mírne presahující 6KB. Pro změření casu jsem napsal krátký skript v Pythonu. Jelikož jsem chtel použít profiler Pythonu, rozdělil jsem prřkazy do samostatných funkcí.

#!/usr/local/bin/python
import psycopg, time

def plain(cur):
    cur.execute ("SELECT jmeno from jmena")

def html_list(cur):
    cur.execute ("SELECT html_list ('select jmeno from jmena','jmeno',0,8000)")
    rows = cur.fetchall ()
    print rows[0][0]

def pyt(cur):
    cur.execute ("SELECT jmeno from jmena")
    rows = cur.fetchall ()
    print "<ul>"
    for r in rows:
        print "<li>%s</li>" % r[0]
    print "</ul>"


def main ():
    for i in range(100):
        con = psycopg.connect ("dbname=testdb011")
        cur = con.cursor ()
        plain(cur)
        html_list(cur)
        pyt(cur)

import profile
profile.run ('main()')
<pre>
Výsledkem je tato tabulka
<pre>
 ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    4.670    4.670 <string>:1(?)
        1    0.090    0.090    4.760    4.760 profile:0(main())
        0    0.000             0.000          profile:0(profiler)
      100    3.670    0.037    3.670    0.037 test.py:12(pyt)
        1    0.620    0.620    4.670    4.670 test.py:20(main)
      100    0.250    0.003    0.250    0.003 test.py:4(plain)
      100    0.130    0.001    0.130    0.001 test.py:7(html_list)

Z ní je zrejmé, že vytvorení seznamu funkcí html_list je oproti "normálnímu" sestavení seznamu zhruba 28 krát rychlejší. Duvodem jsou nejspíš pomalalejší operace Pythonu s řetězci, ale i větší režie spojená s prochazením pole v Pythonu.

Tabulkové funkce

Table Function API umožňuje navrhovat vlastní funkce vracející tabulku (dále tabulkové funkce TF). TF může vracet tabulku obsahující skalární nebo složené (více sloupcové) typy. TF jsou specifické svým opakovaným voláním při vracení tabulky - TF je volána pro každý vrácený řádek.

Pokud budeme pracovat s vícesloupcovými tabulkami (a to ve většině) případů budeme, musíme dokázat vytvořit hodnotu složeného typu. Postup je zhruba následující: vytvoříme jakousi šablonu, pak podle této šablony prostor pro samotná data, a na závěr tento prostor vyplníme vlastními daty, kdy my data poskytujeme v cstring formátu a funkce PostgreSQL se pak postarají o správnou konverzi (podle šablony) do typů PostgreSQL. Mohou nastat dva základní případy a to, když existuje složený typ

tupdesc = RelationNameGetTupleDesc ("_tabsin");

nebo, pokud složený typ dynamicky definujeme

    tupdesc = CreateTemplateTupleDesc (8 , false);
    TupleDescInitEntry (tupdesc,  1, "Typ",      CHAROID,    -1, 0, false);
    TupleDescInitEntry (tupdesc,  2, "Prava",    VARCHAROID, -1, 0, false);
    TupleDescInitEntry (tupdesc,  3, "Uzivatel", VARCHAROID, -1, 0, false);
    TupleDescInitEntry (tupdesc,  4, "Skupina",  VARCHAROID, -1, 0, false);
    TupleDescInitEntry (tupdesc,  5, "Velikost", INT4OID,    -1, 0, false);
    TupleDescInitEntry (tupdesc,  6, "Vytvoreno",ABSTIMEOID, -1, 0, false);
    TupleDescInitEntry (tupdesc,  7, "Modif",    ABSTIMEOID, -1, 0, false);
    TupleDescInitEntry (tupdesc,  8, "Jmeno",    VARCHAROID, -1, 0, false);

Id typů najdeme v catalog/pg_type.h. Další postup je pak stejný pro obě zmíněné varianty:

  slot = TupleDescGetSlot (tupdesc);
  attinmeta = TupleDescGetAttInMetadata (tupdesc);
      ... plnění hodnot values ....
  tuple = BuildTupleFromCStrings (attinmeta, values);
  result = TupleGetDatum (slot, tuple);

Poslední příkaz provádí konverzi typu Tuple na Datum, který jediný můžeme vrátit. TF musí používat V1 volající konvenci. Konverzi do nativních typů PostgreSQL provádí funkce BuildTupleFromCStrings. Parametrem je struktura typu AttInMetadata (šablona pro vytvoření hodnoty) a pole s ukazateli na jednotlivé hodnoty v textovém formátu. Pokud je některý z ukazatelů NULL, pak se jako konvertovaná hodnota použije NULL. Struktury tabdesc, slot a attinmeta lze (a měly by se) použít opakovaně (pro každý řádek). Je neefektivní je vytvářet pro každý vrácený řádek znovu.

Dříve než se budu věnovat samotným tabulkovým funkcím, je třeba alespoň nastínit způsob jakým PostgreSQL přiděluje dynamickou paměť. Paměťový prostor můžeme dynamicky alokovat voláním funkcí palloc, případně změnit jeho velikost funkcí repalloc. Paměť je přidělována z tzv. aktuálního paměťového kontextu. Standardně je paměťový kontext vytvořen při staru funkce a zrušen při po doběhu funkce. Tím je zajištěno, že se veškerá dynamická paměť alokovaná funkcí nebo funkcemi, které volala, uvolní.

Toto chování je ale v případě tabulkových funkcí nežádoucí. Tabulková funkce se spouští opakovaně pro každý řádek vrácené tabulky. My ale potřebujeme prostor, kam můžeme uložit své datové struktury, které budou existovat po celou dobu opakovaného volání tabulkové funkce (výše zmíněné struktury). Proto máme k dispozici multicall paměťový kontext, který je uvolněn až po posledním volání tabulkové funkce (to se pozná tak, že funkce nevrací žádná data). Každý paměťový kontext se musí ještě aktivovat voláním fce. MemoryContextSwitchTo.

if (SRF_IS_FIRSTCALL ())
  {
    MemoryContext  oldcontext;
    funcctx = SRF_FIRSTCALL_INIT ();
    oldcontext = MemoryContextSwitchTo (funcctx->multi_call_memory_ctx);

       ... získání ukazatelů z multicall kontextu

    MemoryContextSwitchTo (oldcontext);
       ... od teto chvile vse po starem
  }
funcctx = SRF_PERCALL_SETUP ();

Funkce SRF_IS_FIRSTCALL vrací nenulovou hodnotu, pokud je funkce spuštěna poprvé (v rámci vyřizování jedné žádosti o tabulku, nikoliv po dobu existence knihovny v paměti). SRF_FIRSTCALL_INIT vytvoří multicall paměťový kontext. Ze struktury functx ještě použijeme pole user_fctx, které použijeme k uložení ukazatele na vlastní "statická" data.

Prvním příkladem TF je funkce fcetabsin, která vrací tabulku s hodnotami funkce sin. Parematry funkce mají význam od, do a krok.

#include "postgres.h"
#include "funcapi.h"
#include <math.h>

typedef struct fcetabsin_info {
  float8 iter;
  float8 krok;
  float8 max;
} fcetabsin_info;

PG_FUNCTION_INFO_V1 (fcetabsin);

Datum
fcetabsin (PG_FUNCTION_ARGS)
{
  FuncCallContext *funcctx;
  TupleDesc        tupdesc;
  TupleTableSlot  *slot;
  AttInMetadata   *attinmeta;
  fcetabsin_info  *fceinfo;

  if (SRF_IS_FIRSTCALL ())
    {
      MemoryContext  oldcontext;
      funcctx = SRF_FIRSTCALL_INIT ();
      oldcontext = MemoryContextSwitchTo (funcctx->multi_call_memory_ctx);

      tupdesc = RelationNameGetTupleDesc ("_tabsin");
      slot = TupleDescGetSlot (tupdesc);
      funcctx -> slot = slot;
      attinmeta = TupleDescGetAttInMetadata (tupdesc);
      funcctx -> attinmeta = attinmeta;

      fceinfo = (fcetabsin_info*) palloc (sizeof (fcetabsin_info));
      fceinfo -> iter = PG_GETARG_FLOAT8 (0);
      fceinfo -> max  = PG_GETARG_FLOAT8 (1);
      fceinfo -> krok = PG_GETARG_FLOAT8 (2);
      funcctx -> user_fctx = (void*) fceinfo;

      MemoryContextSwitchTo (oldcontext);
    }
  funcctx = SRF_PERCALL_SETUP ();
  fceinfo = (fcetabsin_info*) funcctx -> user_fctx;
  if (fceinfo -> iter <= fceinfo -> max)
    {
      Datum result; char **values;
      HeapTuple tuple;

      values = (char **) palloc (2 * sizeof (char *));
      values [0] = (char*) palloc (16 * sizeof (char));
      values [1] = (char*) palloc (16 * sizeof (char));

      snprintf (values [0], 16, "%f", fceinfo -> iter);
      snprintf (values [0], 16, "%f", sin(fceinfo -> iter));
      fceinfo -> iter += fceinfo -> krok;

      tuple = BuildTupleFromCStrings (funcctx -> attinmeta, values);
      result = TupleGetDatum (funcctx -> slot, tuple);

      SRF_RETURN_NEXT (funcctx, result);
    }
  else
    {
      SRF_RETURN_DONE (funcctx);
    }
}

Funkci zaregistrujeme SQL příkazy:

CREATE TYPE _tabsin AS (i float8, j float8);

CREATE OR REPLACE FUNCTION fcetabsin (float8, float8, float8)
 RETURNS SETOF _tabsin AS 'tab_fce_sin.so', 'fcetabsin' LANGUAGE 'C';

a otestujeme příkazem

SELECT * FROM fcetabsin (0.1, 3, 0.1);

Příklad je hodně školský, ale díky tomu je ještě přehledný. Podobný příklad (značně smysluplnější) naleznete v contrib, kde jsou funkce vracející tabulky s číselnými hodnotami v daném rozdělení, atd.

Druhým příkazem je funkce ls, vracející výpis adresáře.

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/dir.h>
#include <stdio.h>
#include "postgres.h"
#include "funcapi.h"
#include "catalog/pg_type.h"
#include <pwd.h>
#include <time.h>
#include <grp.h>

char *modes [] = {
  "---", "--x", "-w-", "-wx",
  "r--", "r-x", "rw-", "rwx"
};

typedef struct UsrFctx {
  DIR *dp;
  char *name;
  char **values;
} UsrFctx;

char **fill_array (char **polozka, char *adresar, struct direct *dir)
{
  char uplna_cesta [1024]; struct stat sbuf;
  char prava [10]; int i, j; 
  struct passwd *pw; struct group *gw;
  struct tm *loctime;

  sprintf (uplna_cesta, "%s/%s", adresar, dir-> d_name);
  stat (uplna_cesta,&sbuf);

  switch (sbuf.st_mode& S_IFMT) 
    {
    case S_IFREG: snprintf (polozka [0], 2, "-"); break; 
    case S_IFDIR: snprintf (polozka [0], 2, "d"); break; 
    case S_IFCHR: snprintf (polozka [0], 2, "c"); break; 
    case S_IFBLK: snprintf (polozka [0], 2, "b"); break; 
    default:      snprintf (polozka [0], 2, "?"); break;
    }

  prava [0] = '\0';
  for (i = 2; i >= 0; i--)
    {
      j = (sbuf.st_mode >> (i *3))& 07;
      strcat (prava, modes [j]);
    }
  if ((sbuf.st_mode> S_ISUID) != 0) prava [2] = 's';
  if ((sbuf.st_mode> S_ISGID) != 0) prava [5] = 's';
  if ((sbuf.st_mode> S_ISVTX) != 0) prava [8] = 't';

  snprintf (polozka [1], 16, "%s", prava);
  if ((pw = getpwuid (sbuf.st_uid)) != 0)
    snprintf (polozka [2], 32, "%s", pw -> pw_name);
  else
    snprintf (polozka [2], 32, "%d", sbuf.st_uid);
  if ((gw = getgrgid (sbuf.st_gid)) != 0)
    snprintf (polozka [3], 32, "%s", gw -> gr_name);
  else
    snprintf (polozka [3], 32, "%d", sbuf.st_gid);
  snprintf (polozka [4], 16, "%d", sbuf.st_size);
  loctime = localtime (&sbuf.st_ctime);
  strftime (polozka [5], 32, "%F %T", loctime);
  loctime = localtime (&sbuf.st_mtime);
  strftime (polozka [6], 32, "%F %T", loctime);
  snprintf (polozka [7], 255, "%s", dir -> d_name);

  return polozka;
}

PG_FUNCTION_INFO_V1 (ls);

Datum
ls (PG_FUNCTION_ARGS)
{
  FuncCallContext *funcctx; TupleDesc        tupdesc;
  TupleTableSlot  *slot;    AttInMetadata   *attinmeta;
  UsrFctx *usrfctx; struct direct *dir;

  if (PG_ARGISNULL (0))
    elog (ERROR, "Parametrem funkce musi byt adresar");

  if (SRF_IS_FIRSTCALL ())
    {

      MemoryContext  oldcontext;
      funcctx = SRF_FIRSTCALL_INIT ();
      oldcontext = MemoryContextSwitchTo (funcctx->multi_call_memory_ctx);
      
      usrfctx = (UsrFctx*) palloc (sizeof (UsrFctx));
      usrfctx -> name = PG_GETARG_CSTRING (0);  

      if ((usrfctx -> dp = opendir (usrfctx -> name)) == NULL)
	elog (ERROR, "%s nelze otevrit",usrfctx ->  name);

      usrfctx -> values = (char **) palloc (8 * sizeof (char *));
      usrfctx -> values  [0] = (char*) palloc   (2 * sizeof (char));
      usrfctx -> values  [1] = (char*) palloc  (16 * sizeof (char));
      usrfctx -> values  [2] = (char*) palloc  (32 * sizeof (char));
      usrfctx -> values  [3] = (char*) palloc  (32 * sizeof (char));
      usrfctx -> values  [4] = (char*) palloc  (16 * sizeof (char));
      usrfctx -> values  [5] = (char*) palloc  (32 * sizeof (char));
      usrfctx -> values  [6] = (char*) palloc  (32 * sizeof (char));
      usrfctx -> values  [7] = (char*) palloc (255 * sizeof (char));

      funcctx -> user_fctx = (void *)usrfctx;
   
      tupdesc = CreateTemplateTupleDesc (8 , false);
      TupleDescInitEntry (tupdesc,  1, "Typ",         CHAROID,    -1, 0, false);
      TupleDescInitEntry (tupdesc,  2, "Prava",       VARCHAROID, -1, 0, false);
      TupleDescInitEntry (tupdesc,  3, "Uzivatel",    VARCHAROID, -1, 0, false);
      TupleDescInitEntry (tupdesc,  4, "Skupina",     VARCHAROID, -1, 0, false);
      TupleDescInitEntry (tupdesc,  5, "Velikost",    INT4OID,    -1, 0, false);
      TupleDescInitEntry (tupdesc,  6, "Vytvoreno",   ABSTIMEOID, -1, 0, false);
      TupleDescInitEntry (tupdesc,  7, "Modifikovano",ABSTIMEOID, -1, 0, false);
      TupleDescInitEntry (tupdesc,  8, "Jmeno",       VARCHAROID, -1, 0, false); 


      slot = TupleDescGetSlot (tupdesc); 
      funcctx -> slot = slot;

      attinmeta = TupleDescGetAttInMetadata (tupdesc);
      funcctx -> attinmeta = attinmeta;

      MemoryContextSwitchTo (oldcontext);
      
    }
  funcctx = SRF_PERCALL_SETUP ();
  usrfctx = (UsrFctx*) funcctx -> user_fctx;

  if ((dir = readdir (usrfctx -> dp)) != NULL)
    {
      Datum result;  HeapTuple tuple;
      while (dir->d_ino == 0)
	{
	  if ((dir = readdir (usrfctx -> dp)) != NULL)
	    {
	      closedir (usrfctx -> dp);
	      SRF_RETURN_DONE (funcctx);      
	    }
	}
      fill_array (usrfctx -> values, usrfctx -> name, dir);

      tuple = BuildTupleFromCStrings (funcctx -> attinmeta,
				      usrfctx -> values);
      result = TupleGetDatum (funcctx -> slot, tuple);
      SRF_RETURN_NEXT (funcctx, result);
    }
  else
    {
      closedir (usrfctx -> dp);
      SRF_RETURN_DONE (funcctx);      
    } 
}

Funkci zaregistrujte

CREATE OR REPLACE FUNCTION ls (cstring) RETURNS SETOF record
   AS 'ls.so', 'ls', LANGUAGE 'C';

Poté si můžeme nechat vypsat obsah adresáře.

testdb011=#select * from ls ('/home/') as 
  (typ "char", prava varchar, uzivatel varchar, skupina varchar, 
velikost int4, vytvoreno abstime, modifikovano abstime, jmeno varchar);
 typ |   prava   | uzivatel | skupina  | velikost |       vytvoreno        |    modifikovano        | jmeno
 ----+-----------+----------+----------+----------+------------------------+------------------------+----------
 d   | rwxr-xr-x | root     | root     |     4096 | 2002-09-03 12:04:09-04 | 2002-09-03 12:04:09-04 | .
 d   | rwxr-xr-x | root     | root     |     4096 | 2002-11-19 20:47:43-05 | 2002-11-19 20:47:43-05 | ..
 d   | rwxr-xr-x | pavel    | users    |     4096 | 2002-11-19 22:25:56-05 | 2002-11-19 22:25:56-05 | pavel
 d   | rwx------ | zdenek   | zdenek   |     4096 | 2002-09-03 08:52:54-04 | 2002-09-03 08:52:54-04 | zdenek
 d   | rwx------ | postgres | postgres |     4096 | 2002-11-19 22:26:02-05 | 2002-11-19 22:26:02-05 | postgres
(5 rows)

Tato funkce je již užitečnější. Díky ní můžeme z PL/pgSQL monitorovat obsah adresářů, výpisem adresáře /tmp můžeme zjistit, kdy a jak dlouho je PostgreSQL spuštěn, atd. Určite můžeme napsat TF pro import CSV nebo XML souborů, pro rozkouskování MIME dokumentů, pro přístup k IMAP, POP3 nebo LDAP serverům.