blob: a3dd4372a9f6d5c88019c9b3e909617583587b99 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* cdbcat.c
* Provides routines for reading info from mpp schema tables
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "catalog/catquery.h"
#include "catalog/pg_namespace.h"
#include "catalog/indexing.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "cdb/cdbcat.h"
#include "cdb/cdbrelsize.h"
#include "cdb/cdbvars.h" /* Gp_role */
#include "catalog/pg_type.h"
#include "utils/array.h"
static void extract_INT2OID_array(Datum array_datum, int *lenp, int16 **vecp);
/*
* GpPolicyCopy -- Return a copy of a GpPolicy object.
*
* The copy is palloc'ed in the specified context.
*/
GpPolicy *
GpPolicyCopy(MemoryContext mcxt, const GpPolicy *src)
{
GpPolicy *tgt;
size_t nb;
if (!src)
return NULL;
nb = sizeof(*src) - sizeof(src->attrs) + src->nattrs * sizeof(src->attrs[0]);
tgt = (GpPolicy *)MemoryContextAlloc(mcxt, nb);
memcpy(tgt, src, nb);
return tgt;
} /* GpPolicyCopy */
/*
* GpPolicyEqual -- Test equality of policies (by value only).
*/
bool
GpPolicyEqual(const GpPolicy *lft, const GpPolicy *rgt)
{
int i;
if (lft == rgt)
return true;
if (!lft || !rgt)
return false;
if ( lft->ptype != rgt->ptype )
return false;
if (lft->bucketnum != rgt->bucketnum)
return false;
if ( lft->nattrs != rgt->nattrs )
return false;
for ( i = 0; i < lft->nattrs; i++ )
if ( lft->attrs[i] != rgt->attrs[i] )
return false;
return true;
} /* GpPolicyEqual */
/*
* GpPolicyFetch
*
* Looks up a given Oid in the gp_distribution_policy table.
* If found, returns an GpPolicy object (palloc'd in the specified
* context) containing the info from the gp_distribution_policy row.
* Else returns NULL.
*
* The caller is responsible for passing in a valid relation oid. This
* function does not check and assigns a policy of type POLICYTYPE_ENTRY
* for any oid not found in gp_distribution_policy.
*/
GpPolicy *
GpPolicyFetch(MemoryContext mcxt, Oid tbloid)
{
GpPolicy *policy = NULL; /* The result */
Relation gp_policy_rel;
cqContext cqc;
HeapTuple gp_policy_tuple = NULL;
size_t nb;
/*
* Skip if qExec or utility mode.
*/
if (Gp_role != GP_ROLE_DISPATCH)
return NULL;
/*
* We need to read the gp_distribution_policy table
*/
gp_policy_rel = heap_open(GpPolicyRelationId, AccessShareLock);
/*
* Select by value of the localoid field
*/
gp_policy_tuple = caql_getfirst(
caql_addrel(cqclr(&cqc), gp_policy_rel),
cql("SELECT * FROM gp_distribution_policy "
" WHERE localoid = :1 ",
ObjectIdGetDatum(tbloid)));
/*
* Read first (and only ) tuple
*/
if (HeapTupleIsValid(gp_policy_tuple))
{
bool isNull;
Datum attr;
Datum bucketnum;
int i,
nattrs = 0;
int16 *attrnums = NULL;
/*
* Get the attributes on which to partition.
*/
attr = heap_getattr(gp_policy_tuple, Anum_gp_policy_attrnums,
RelationGetDescr(gp_policy_rel), &isNull);
/*
* Get distribution keys only if this table has a policy.
*/
if(!isNull)
{
extract_INT2OID_array(attr, &nattrs, &attrnums);
Assert(nattrs >= 0);
}
/*
* Get the partition number.
*/
bucketnum = heap_getattr(gp_policy_tuple, Anum_gp_policy_bucketnum,
RelationGetDescr(gp_policy_rel), NULL);
/* Create an GpPolicy object. */
nb = sizeof(*policy) - sizeof(policy->attrs) + nattrs * sizeof(policy->attrs[0]);
policy = (GpPolicy *)MemoryContextAlloc(mcxt, nb);
policy->ptype = POLICYTYPE_PARTITIONED;
policy->bucketnum = DatumGetInt32(bucketnum);
policy->nattrs = nattrs;
for (i = 0; i < nattrs; i++)
{
policy->attrs[i] = attrnums[i];
}
}
/*
* Cleanup the scan and relation objects.
*/
heap_close(gp_policy_rel, AccessShareLock);
/* Interpret absence of a valid policy row as POLICYTYPE_ENTRY */
if (policy == NULL)
{
nb = sizeof(*policy) - sizeof(policy->attrs);
policy = (GpPolicy *)MemoryContextAlloc(mcxt, nb);
policy->ptype = POLICYTYPE_ENTRY;
policy->bucketnum = GetPlannerSegmentNum();
policy->nattrs = 0;
}
return policy;
} /* GpPolicyFetch */
/*
* Extract len and pointer to buffer from an int16[] (vector) Datum
* representing a PostgreSQL INT2OID type.
*/
static void
extract_INT2OID_array(Datum array_datum, int *lenp, int16 **vecp)
{
ArrayType *array_type;
Assert(lenp != NULL);
Assert(vecp != NULL);
array_type = DatumGetArrayTypeP(array_datum);
Assert(ARR_NDIM(array_type) == 1);
Assert(ARR_ELEMTYPE(array_type) == INT2OID);
Assert(ARR_LBOUND(array_type)[0] == 1);
*lenp = ARR_DIMS(array_type)[0];
*vecp = (int16 *) ARR_DATA_PTR(array_type);
return;
}
/*
* Sets the policy of a table into the gp_distribution_policy table
* from a GpPolicy structure.
*
*/
void
GpPolicyStore(Oid tbloid, const GpPolicy *policy)
{
Relation gp_policy_rel;
HeapTuple gp_policy_tuple = NULL;
ArrayType *attrnums;
bool nulls[3];
Datum values[3];
cqContext cqc;
cqContext *pcqCtx;
Insist(policy->ptype == POLICYTYPE_PARTITIONED);
/*
* Open and lock the gp_distribution_policy catalog.
*/
gp_policy_rel = heap_open(GpPolicyRelationId, RowExclusiveLock);
pcqCtx = caql_beginscan(
caql_addrel(cqclr(&cqc), gp_policy_rel),
cql("INSERT INTO gp_distribution_policy ",
NULL));
/*
* Convert C arrays into Postgres arrays.
*/
if (policy->nattrs > 0)
{
int i;
Datum *akey;
akey = (Datum *) palloc(policy->nattrs * sizeof(Datum));
for (i = 0; i < policy->nattrs; i++)
akey[i] = Int16GetDatum(policy->attrs[i]);
attrnums = construct_array(akey, policy->nattrs,
INT2OID, 2, true, 's');
}
else
{
attrnums = NULL;
}
nulls[0] = false;
nulls[1] = false;
nulls[2] = false;
values[0] = ObjectIdGetDatum(tbloid);
values[1] = Int32GetDatum(policy->bucketnum);
if (attrnums)
values[2] = PointerGetDatum(attrnums);
else
nulls[2] = true;
gp_policy_tuple = caql_form_tuple(pcqCtx, values, nulls);
/* Insert tuple into the relation */
caql_insert(pcqCtx, gp_policy_tuple); /* implicit update of index as well*/
/*
* Close the gp_distribution_policy relcache entry without unlocking.
* We have updated the catalog: consequently the lock must be held until
* end of transaction.
*/
caql_endscan(pcqCtx);
heap_close(gp_policy_rel, NoLock);
} /* GpPolicyStore */
/*
* Sets the policy of a table into the gp_distribution_policy table
* from a GpPolicy structure.
*/
void
GpPolicyReplace(Oid tbloid, const GpPolicy *policy)
{
Relation gp_policy_rel;
HeapTuple gp_policy_tuple = NULL;
cqContext cqc;
cqContext *pcqCtx;
ArrayType *attrnums;
bool nulls[3];
Datum values[3];
bool repl[3];
Insist(policy->ptype == POLICYTYPE_PARTITIONED);
/*
* Open and lock the gp_distribution_policy catalog.
*/
gp_policy_rel = heap_open(GpPolicyRelationId, RowExclusiveLock);
pcqCtx = caql_addrel(cqclr(&cqc), gp_policy_rel);
/*
* Convert C arrays into Postgres arrays.
*/
if (policy->nattrs > 0)
{
int i;
Datum *akey;
akey = (Datum *) palloc(policy->nattrs * sizeof(Datum));
for (i = 0; i < policy->nattrs; i++)
akey[i] = Int16GetDatum(policy->attrs[i]);
attrnums = construct_array(akey, policy->nattrs,
INT2OID, 2, true, 's');
}
else
{
attrnums = NULL;
}
nulls[0] = false;
nulls[1] = false;
nulls[2] = false;
values[0] = ObjectIdGetDatum(tbloid);
values[1] = Int32GetDatum(policy->bucketnum);
if (attrnums)
values[2] = PointerGetDatum(attrnums);
else
nulls[2] = true;
repl[0] = false;
repl[1] = false;
repl[2] = true;
if (policy->bucketnum > 0)
repl[1] = true;
/*
* Select by value of the localoid field
*/
gp_policy_tuple = caql_getfirst(
pcqCtx,
cql("SELECT * FROM gp_distribution_policy "
" WHERE localoid = :1 "
" FOR UPDATE ",
ObjectIdGetDatum(tbloid)));
/*
* Read first (and only ) tuple
*/
if (HeapTupleIsValid(gp_policy_tuple))
{
HeapTuple newtuple = caql_modify_current(pcqCtx, values,
nulls, repl);
caql_update_current(pcqCtx, newtuple);
/* and Update indexes (implicit) */
heap_freetuple(newtuple);
}
else
{
gp_policy_tuple = caql_form_tuple(pcqCtx, values, nulls);
caql_insert(pcqCtx, gp_policy_tuple);
/* and Update indexes (implicit) */
}
/*
* Close the gp_distribution_policy relcache entry without unlocking.
* We have updated the catalog: consequently the lock must be held until
* end of transaction.
*/
heap_close(gp_policy_rel, NoLock);
} /* GpPolicyReplace */
/*
* Removes the policy of a table from the gp_distribution_policy table
*
* Callers must check that there actually *is* a policy for the relation.
*/
void
GpPolicyRemove(Oid tbloid)
{
Relation gp_policy_rel;
cqContext cqc;
/*
* Open and lock the gp_distribution_policy catalog.
*/
gp_policy_rel = heap_open(GpPolicyRelationId, RowExclusiveLock);
/* Delete the policy entry from the catalog. */
if (0 == caql_getcount(
caql_addrel(cqclr(&cqc), gp_policy_rel),
cql("DELETE FROM gp_distribution_policy "
" WHERE localoid = :1 ",
ObjectIdGetDatum(tbloid))))
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("distribution policy for relation \"%d\" does not exist",
tbloid)));
}
/*
* Close the gp_distribution_policy relcache entry without unlocking.
* We have updated the catalog: consequently the lock must be held until
* end of transaction.
*/
heap_close(gp_policy_rel, NoLock);
} /* GpPolicyRemove */
/*
* Does the supplied GpPolicy support unique indexing on the specified
* attributes?
*
* If the table is distributed randomly, no unique indexing is supported.
* Otherwise, the set of columns being indexed should be a superset of the
* policy.
*
* If the proposed index does not match the distribution policy but the relation
* is empty and does not have a primary key or unique index, update the
* distribution policy to match the index definition (MPP-101), as long as it
* doesn't contain expressions.
*/
void
checkPolicyForUniqueIndex(Relation rel, AttrNumber *indattr, int nidxatts,
bool isprimary, bool has_exprs, bool is_pkey,
bool is_ukey)
{
Bitmapset *polbm = NULL;
Bitmapset *indbm = NULL;
int i;
GpPolicy *pol = rel->rd_cdbpolicy;
/*
* Firstly, unique/primary key indexes aren't supported if we're
* distributing randomly.
*/
if (pol->ptype == POLICYTYPE_PARTITIONED &&
pol->nattrs == 0 /* distributed randomly */)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("%s and DISTRIBUTED RANDOMLY are incompatible",
isprimary ? "PRIMARY KEY" : "UNIQUE"),
errOmitLocation(true)));
}
/*
* We use bitmaps to make intersection tests easier. As noted, order is
* not relevant so looping is just painful.
*/
for (i = 0; i < pol->nattrs; i++)
polbm = bms_add_member(polbm, pol->attrs[i]);
for (i = 0; i < nidxatts; i++)
{
if (indattr[i] < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot create %s on system column",
isprimary ? "primary key" : "unique index")));
indbm = bms_add_member(indbm, indattr[i]);
}
Assert(bms_membership(polbm) != BMS_EMPTY_SET);
Assert(bms_membership(indbm) != BMS_EMPTY_SET);
/*
* NOTE: Do not change the policy according to PRIMARY KEY/UNIQUE index
* definitions. We always assume that the distributed policy was explicitly
* set. Because we can't get accurate cdbRelSize(rel) for magma format table.
*/
if (!bms_is_subset(polbm, indbm))
{
if ( is_pkey || is_ukey || has_exprs) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("%s must contain all columns in the "
"distribution key of relation \"%s\"",
isprimary ? "PRIMARY KEY" : "UNIQUE index",
RelationGetRelationName(rel)),
errOmitLocation(true)));
}
}
/*
* If the existing policy is not a subset, we must either error out or
* update the distribution policy. It might be tempting to say that even
* when the policy is a subset, we should update it to match the index
* definition. The problem then is that if the user actually wants to
* distribution on (a, b) but then creates an index on (a, b, c) we'll
* change the policy underneath them.
*
* What is really needed is a new field in gp_distribution_policy telling us
* if the policy has been explicitly set.
*/
/*
if (!bms_is_subset(polbm, indbm))
{
if (cdbRelSize(rel) != 0 || has_pkey || has_ukey || has_exprs)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("%s must contain all columns in the "
"distribution key of relation \"%s\"",
isprimary ? "PRIMARY KEY" : "UNIQUE index",
RelationGetRelationName(rel)),
errOmitLocation(true)));
}
else
{
// update policy since table is not populated yet. See MPP-101
GpPolicy *policy = palloc(sizeof(GpPolicy) +
(sizeof(AttrNumber) * nidxatts));
policy->ptype = POLICYTYPE_PARTITIONED;
policy->bucketnum = GetRelOpt_bucket_num_fromRel(rel, GetHashDistPartitionNum());
policy->nattrs = 0;
for (i = 0; i < nidxatts; i++)
policy->attrs[policy->nattrs++] = indattr[i];
GpPolicyReplace(rel->rd_id, policy);
if (isprimary)
elog(NOTICE, "updating distribution policy to match new primary key");
else
elog(NOTICE, "updating distribution policy to match new unique index");
}
}
*/
}