blob: 69664b3406e4e43c0c5c5417bf854eb3a6a62989 [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* mlipc_access.c
* Test functions for Motion Layer IPC API. These are to be called from SQL
* for testing purposes.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "access/tupdesc.h"
#include "cdb/cdbvars.h"
#include "cdb/tupchunk.h"
#include "cdb/tupchunklist.h"
#include "cdb/cdbmotion.h"
#include "cdb/cdblink.h"
#include "cdb/cdbutil.h"
#include "cdb/ml_ipc.h"
#include "executor/spi.h"
#include "libpq/pqformat.h"
#include "utils/builtins.h"
#include "cdbtest.h"
#include <sys/time.h>
#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
/*
* NOTE: This function assumes/requires the following to be true:
* (1) This function MUST be called in utility mode.!
*
* (2) This function assumes that Motion Layer setup and cdblink setup has
* already occured magically.
*
* (3)
*
*/
PG_FUNCTION_INFO_V1(ml_ipc_bench);
Datum ml_ipc_bench(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
MemoryContext oldcontext;
int call_cntr;
int max_calls;
TupleDesc tupdesc;
AttInMetadata *attinmeta;
uint32 num_msgs;
uint32 data_size;
int i;
int mySegIndex;
bool isPrimary;
int msgsReceived = 0;
int msgsSent = 0;
char *msg;
uint64 totalMsec;
GpRoleValue oldRole = Gp_role;
/* temporarily disable this routine. see TODO item below. */
ereport(ERROR, (errcode(ERRCODE_GP_FEATURE_NOT_YET),
errmsg("Interconnect testing through libcdbtest is currently disabled")
));
if( SRF_IS_FIRSTCALL() )
{
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/* total number of tuples to be returned */
funcctx->max_calls = 1;
/* Build a tuple description for a __testpassbyval tuple */
tupdesc = RelationNameGetTupleDesc("__ml_ipc_bench_results");
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
/*
* generate attribute metadata needed later to produce tuples from raw
* C strings
*/
attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;
//read in args
num_msgs = PG_GETARG_UINT32(0);
data_size = PG_GETARG_UINT32(1);
//set things up
getGpIdentity( &mySegIndex, &isPrimary );
//create our test message
msg = palloc(data_size);
for( i=0; i < data_size; i++ )
{
//TODO: create better test messages.
*(msg+i) = 'a';
}
/* Form a tuplechunk with the message as a payload */
TupleChunkListItem tcItem = (TupleChunkListItem)
palloc0(sizeof(TupleChunkListData) +
TUPLE_CHUNK_HEADER_SIZE +
strlen(msg));
tcItem->chunk_length = strlen(msg) + TUPLE_CHUNK_HEADER_SIZE;
SetTargetSegIdx(tcItem->chunk_data, mySegIndex);
SetQueryGroupID(tcItem->chunk_data, 1);
SetSourceSegIdx(tcItem->chunk_data, mySegIndex);
SetMotionNodeID(tcItem->chunk_data, 1);
SetChunkDataSize(tcItem->chunk_data, strlen(msg) );
SetChunkType(tcItem->chunk_data, TC_WHOLE );
memcpy( tcItem->chunk_data + TUPLE_CHUNK_HEADER_SIZE, msg, strlen(msg) );
bool done = false;
TupleChunkListItem recvTcItem;
TupleChunkListItem nextTcItem;
TupleChunkListItem lastTcItem;
//stat variables
struct timeval startTime;
struct timeval stopTime;
gettimeofday(&startTime, NULL);
while( ! done )
{
//try and send out
if( msgsSent != num_msgs)
{
/* TODO: we need to enable this call again in order to support this test routine!
if( SendTupleChunkToAMS( tcItem ) ){
msgsSent++;
}
*/
}
//try and receive
if( (recvTcItem = RecvTupleChunkFromAMS() ) != NULL )
{
nextTcItem = recvTcItem;
lastTcItem = recvTcItem;
//need to go through recvTcItem and count the items
while( nextTcItem != NULL )
{
msgsReceived++;
lastTcItem = nextTcItem;
nextTcItem = lastTcItem->p_next;
if( msgsReceived % 1000 == 0 )
elog( NOTICE, "recvd: %d", msgsReceived );
}
clearTCList(recvTcItem);
}
//test for exit
if( msgsSent == num_msgs && msgsReceived == num_msgs )
done = true;
}
gettimeofday(&stopTime, NULL);
totalMsec = ( (stopTime.tv_sec - startTime.tv_sec) * 1000) +
( (stopTime.tv_usec - startTime.tv_usec) / 1000 );
pfree(tcItem);
pfree(msg);
MemoryContextSwitchTo(oldcontext);
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
call_cntr = funcctx->call_cntr;
max_calls = funcctx->max_calls;
attinmeta = funcctx->attinmeta;
if (call_cntr < max_calls) /* do when there is more left to send */
{
Datum result;
HeapTuple tuple;
Datum resultValues[3]; /* Parts of the result set. */
char *resultNulls = " "; /* No NULLs in result set. */
int64 bytes = num_msgs * data_size;
double MBs = (double)bytes / (1024*1024);
double Secs = (double)totalMsec / 1000;
double bw = (double)MBs / Secs;
resultValues[0] = Int64GetDatum( totalMsec );
resultValues[1] = Float64GetDatum( &MBs );
resultValues[2] = Float64GetDatum( &bw );
tuple = heap_formtuple(funcctx->tuple_desc, resultValues, resultNulls);
/* make the tuple into a datum */
result = HeapTupleGetDatum(tuple);
SRF_RETURN_NEXT(funcctx, result);
}
else /* do when there is no more left */
{
SRF_RETURN_DONE(funcctx);
}
}