| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.metamodel.salesforce; |
| |
| import java.text.SimpleDateFormat; |
| import java.util.*; |
| |
| import org.apache.metamodel.MetaModelException; |
| import org.apache.metamodel.QueryPostprocessDataContext; |
| import org.apache.metamodel.UpdateScript; |
| import org.apache.metamodel.UpdateSummary; |
| import org.apache.metamodel.UpdateableDataContext; |
| import org.apache.metamodel.data.DataSet; |
| import org.apache.metamodel.data.FirstRowDataSet; |
| import org.apache.metamodel.query.FilterItem; |
| import org.apache.metamodel.query.FromItem; |
| import org.apache.metamodel.query.OperatorType; |
| import org.apache.metamodel.query.OrderByItem; |
| import org.apache.metamodel.query.Query; |
| import org.apache.metamodel.query.SelectItem; |
| import org.apache.metamodel.schema.Column; |
| import org.apache.metamodel.schema.ColumnType; |
| import org.apache.metamodel.schema.Schema; |
| import org.apache.metamodel.schema.Table; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.sforce.soap.partner.Connector; |
| import com.sforce.soap.partner.PartnerConnection; |
| import com.sforce.soap.partner.QueryResult; |
| import com.sforce.ws.ConnectionException; |
| import com.sforce.ws.ConnectorConfig; |
| |
| /** |
| * A datacontext that uses the Salesforce API. |
| * |
| * Metadata about schema structure is explored using 'describe' SOAP web services. |
| * |
| * Queries are fired using the SOQL dialect of SQL, see |
| * <a href= "http://www.salesforce.com/us/developer/docs/api/Content/sforce_api_calls_soql_select.htm" >SOQL |
| * reference</a>. |
| */ |
| public class SalesforceDataContext extends QueryPostprocessDataContext implements UpdateableDataContext { |
| |
| public static final TimeZone SOQL_TIMEZONE = TimeZone.getTimeZone("UTC"); |
| public static final String SOQL_DATE_FORMAT_IN = "yyyy-MM-dd"; |
| public static final String SOQL_DATE_FORMAT_OUT = "yyyy-MM-dd"; |
| public static final String SOQL_DATE_TIME_FORMAT_IN = "yyyy-MM-dd'T'HH:mm:ss.SSS"; |
| public static final String SOQL_DATE_TIME_FORMAT_OUT = "yyyy-MM-dd'T'HH:mm:ssZZZ"; |
| public static final String SOQL_TIME_FORMAT_IN = "HH:mm:ss.SSS"; |
| public static final String SOQL_TIME_FORMAT_OUT = "HH:mm:ssZZZ"; |
| |
| private static final Logger logger = LoggerFactory.getLogger(SalesforceDataContext.class); |
| |
| private final PartnerConnection _connection; |
| |
| public SalesforceDataContext(String endpoint, String username, String password, String securityToken) { |
| super(false); |
| try { |
| final ConnectorConfig config = new ConnectorConfig(); |
| config.setUsername(username); |
| config.setPassword(securityToken == null ? password : password + securityToken); |
| if (endpoint != null) { |
| config.setAuthEndpoint(endpoint); |
| config.setServiceEndpoint(endpoint); |
| } |
| _connection = Connector.newConnection(config); |
| } catch (ConnectionException e) { |
| throw SalesforceUtils.wrapException(e, "Failed to log in to Salesforce service"); |
| } |
| } |
| |
| public SalesforceDataContext(String username, String password, String securityToken) { |
| super(false); |
| try { |
| _connection = |
| Connector.newConnection(username, securityToken == null ? password : password + securityToken); |
| } catch (ConnectionException e) { |
| throw SalesforceUtils.wrapException(e, "Failed to log in to Salesforce service"); |
| } |
| } |
| |
| public SalesforceDataContext(String username, String password) { |
| super(false); |
| try { |
| _connection = Connector.newConnection(username, password); |
| } catch (ConnectionException e) { |
| throw SalesforceUtils.wrapException(e, "Failed to log in to Salesforce service"); |
| } |
| } |
| |
| /** |
| * Creates a {@code SalesforceDataContext} instance , configured with given salesforce connection. |
| * |
| * @param connection salesforce connection (cannot be {@code null}). |
| * |
| */ |
| public SalesforceDataContext(PartnerConnection connection) { |
| super(false); |
| if (connection == null) { |
| throw new IllegalArgumentException("connection cannot be null"); |
| } |
| _connection = connection; |
| } |
| |
| /** |
| * Returns the Salesforce connection being used by this datacontext. |
| * |
| * @return the Salesforce connection |
| */ |
| public PartnerConnection getConnection() { |
| return _connection; |
| } |
| |
| @Override |
| protected Schema getMainSchema() throws MetaModelException { |
| final SalesforceSchema schema = new SalesforceSchema(getMainSchemaName(), _connection); |
| return schema; |
| } |
| |
| @Override |
| protected String getMainSchemaName() throws MetaModelException { |
| return "Salesforce"; |
| } |
| |
| @Override |
| public DataSet executeQuery(Query query) { |
| final List<FromItem> fromItems = query.getFromClause().getItems(); |
| if (fromItems.size() != 1) { |
| // not a simple SELECT ... FROM [table] ... query, we need to use |
| // query post processing. |
| return super.executeQuery(query); |
| } |
| |
| final Table table = fromItems.get(0).getTable(); |
| if (table == null) { |
| return super.executeQuery(query); |
| } |
| |
| if (!query.getGroupByClause().isEmpty()) { |
| return super.executeQuery(query); |
| } |
| |
| if (!query.getHavingClause().isEmpty()) { |
| return super.executeQuery(query); |
| } |
| |
| final List<SelectItem> selectItems = query.getSelectClause().getItems(); |
| final StringBuilder sb = new StringBuilder(); |
| |
| try { |
| sb.append("SELECT "); |
| int i = 0; |
| final List<Column> columns = new ArrayList<>(selectItems.size()); |
| for (SelectItem selectItem : selectItems) { |
| validateSoqlSupportedSelectItem(selectItem); |
| columns.set(i,selectItem.getColumn()); |
| if (i != 0) { |
| sb.append(", "); |
| } |
| sb.append(columns.get(i).getName()); |
| i++; |
| } |
| |
| sb.append(" FROM "); |
| sb.append(table.getName()); |
| |
| boolean firstWhere = true; |
| for (FilterItem filterItem : query.getWhereClause().getItems()) { |
| if (firstWhere) { |
| sb.append(" WHERE "); |
| firstWhere = false; |
| } else { |
| sb.append(" AND "); |
| } |
| rewriteFilterItem(sb, filterItem); |
| } |
| |
| i = 0; |
| final List<OrderByItem> items = query.getOrderByClause().getItems(); |
| for (OrderByItem orderByItem : items) { |
| if (i == 0) { |
| sb.append(" ORDER BY "); |
| } else { |
| sb.append(", "); |
| } |
| |
| final SelectItem selectItem = orderByItem.getSelectItem(); |
| validateSoqlSupportedSelectItem(selectItem); |
| |
| final Column column = selectItem.getColumn(); |
| sb.append(column.getName()); |
| sb.append(' '); |
| sb.append(orderByItem.getDirection()); |
| |
| i++; |
| } |
| |
| final Integer firstRow = query.getFirstRow(); |
| final Integer maxRows = query.getMaxRows(); |
| if (maxRows != null && maxRows > 0) { |
| if (firstRow != null) { |
| // add first row / offset to avoid missing some records. |
| sb.append(" LIMIT " + (maxRows + firstRow - 1)); |
| } else { |
| sb.append(" LIMIT " + maxRows); |
| } |
| } |
| |
| final QueryResult result = executeSoqlQuery(sb.toString()); |
| |
| DataSet dataSet = new SalesforceDataSet(columns, result, _connection); |
| |
| if (firstRow != null) { |
| // OFFSET is still only a developer preview feature of SFDC. See |
| // http://www.salesforce.com/us/developer/docs/api/Content/sforce_api_calls_soql_select_offset.htm |
| dataSet = new FirstRowDataSet(dataSet, firstRow.intValue()); |
| } |
| |
| return dataSet; |
| |
| } catch (UnsupportedOperationException e) { |
| logger.debug("Failed to rewrite query to SOQL, falling back to regular query post-processing", e); |
| return super.executeQuery(query); |
| } |
| } |
| |
| @Override |
| protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) { |
| final String query; |
| |
| try { |
| final StringBuilder sb = new StringBuilder(); |
| sb.append("SELECT COUNT() FROM "); |
| sb.append(table.getName()); |
| |
| boolean firstWhere = true; |
| for (FilterItem filterItem : whereItems) { |
| if (firstWhere) { |
| sb.append(" WHERE "); |
| firstWhere = false; |
| } else { |
| sb.append(" AND "); |
| } |
| rewriteFilterItem(sb, filterItem); |
| } |
| query = sb.toString(); |
| } catch (UnsupportedOperationException e) { |
| logger.debug("Failed to rewrite count query, falling back to client side counting", e); |
| |
| // unable to rewrite to SOQL, counting will be done client side. |
| return null; |
| } |
| |
| final QueryResult queryResult = executeSoqlQuery(query); |
| |
| assert queryResult.isDone(); |
| |
| return queryResult.getSize(); |
| } |
| |
| protected static void rewriteFilterItem(StringBuilder sb, FilterItem filterItem) |
| throws UnsupportedOperationException { |
| if (filterItem.isCompoundFilter()) { |
| FilterItem[] childrend = filterItem.getChildItems(); |
| boolean firstChild = true; |
| sb.append('('); |
| for (FilterItem child : childrend) { |
| if (firstChild) { |
| firstChild = false; |
| } else { |
| sb.append(' '); |
| sb.append(filterItem.getLogicalOperator().toString()); |
| sb.append(' '); |
| } |
| rewriteFilterItem(sb, child); |
| } |
| sb.append(')'); |
| return; |
| } |
| |
| final SelectItem selectItem = filterItem.getSelectItem(); |
| validateSoqlSupportedSelectItem(selectItem); |
| |
| final Column column = selectItem.getColumn(); |
| sb.append(column.getName()); |
| sb.append(' '); |
| |
| final OperatorType operator = filterItem.getOperator(); |
| if (OperatorType.IN.equals(operator)) { |
| throw new UnsupportedOperationException("IN operator not supported: " + filterItem); |
| } |
| sb.append(operator.toSql()); |
| sb.append(' '); |
| |
| final Object operand = filterItem.getOperand(); |
| if (operand == null) { |
| sb.append("null"); |
| } else if (operand instanceof String) { |
| sb.append('\''); |
| |
| String str = operand.toString(); |
| str = str.replaceAll("\'", "\\\\'"); |
| str = str.replaceAll("\"", "\\\\\""); |
| str = str.replaceAll("\r", "\\\\r"); |
| str = str.replaceAll("\n", "\\\\n"); |
| str = str.replaceAll("\t", "\\\\t"); |
| |
| sb.append(str); |
| sb.append('\''); |
| } else if (operand instanceof Number) { |
| sb.append(operand); |
| } else if (operand instanceof Date) { |
| final SimpleDateFormat dateFormat; |
| ColumnType expectedColumnType = selectItem.getExpectedColumnType(); |
| if (expectedColumnType == ColumnType.DATE) { |
| // note: we don't apply the timezone for DATE fields, since they |
| // don't contain time-of-day information. |
| dateFormat = new SimpleDateFormat(SOQL_DATE_FORMAT_OUT); |
| } else if (expectedColumnType == ColumnType.TIME) { |
| dateFormat = new SimpleDateFormat(SOQL_TIME_FORMAT_OUT, Locale.ENGLISH); |
| dateFormat.setTimeZone(SOQL_TIMEZONE); |
| } else { |
| dateFormat = new SimpleDateFormat(SOQL_DATE_TIME_FORMAT_OUT, Locale.ENGLISH); |
| dateFormat.setTimeZone(SOQL_TIMEZONE); |
| } |
| |
| String str = dateFormat.format((Date) operand); |
| logger.debug("Date '{}' formatted as: {}", operand, str); |
| sb.append(str); |
| } else if (operand instanceof Column) { |
| sb.append(((Column) operand).getName()); |
| } else if (operand instanceof SelectItem) { |
| SelectItem operandSelectItem = (SelectItem) operand; |
| validateSoqlSupportedSelectItem(operandSelectItem); |
| sb.append(operandSelectItem.getColumn().getName()); |
| } else { |
| throw new UnsupportedOperationException("Unsupported operand: " + operand); |
| } |
| } |
| |
| private static void validateSoqlSupportedSelectItem(SelectItem selectItem) throws UnsupportedOperationException { |
| if (selectItem.hasFunction()) { |
| throw new UnsupportedOperationException("Function select items not supported: " + selectItem); |
| } |
| if (selectItem.getSubQuerySelectItem() != null) { |
| throw new UnsupportedOperationException("Subquery select items not supported: " + selectItem); |
| } |
| } |
| |
| @Override |
| protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) { |
| final StringBuilder sb = new StringBuilder(); |
| sb.append("SELECT "); |
| for (int i = 0; i < columns.size(); i++) { |
| if (i != 0) { |
| sb.append(','); |
| } |
| sb.append(columns.get(i).getName()); |
| } |
| sb.append(" FROM "); |
| sb.append(table.getName()); |
| |
| if (maxRows > 0) { |
| sb.append(" LIMIT " + maxRows); |
| } |
| |
| final QueryResult queryResult = executeSoqlQuery(sb.toString()); |
| return new SalesforceDataSet(columns, queryResult, _connection); |
| } |
| |
| private QueryResult executeSoqlQuery(String query) { |
| logger.info("Executing SOQL query: {}", query); |
| try { |
| QueryResult queryResult = _connection.query(query); |
| return queryResult; |
| } catch (ConnectionException e) { |
| throw SalesforceUtils.wrapException(e, "Failed to invoke query service"); |
| } |
| } |
| |
| @Override |
| public UpdateSummary executeUpdate(UpdateScript update) { |
| final SalesforceUpdateCallback callback = new SalesforceUpdateCallback(this, _connection); |
| update.run(callback); |
| callback.close(); |
| return callback.getUpdateSummary(); |
| } |
| } |