blob: 5ea6e8dab776d3756026f2ac6f42593037ee83ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.impl.igfs;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathSummary;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.igfs.common.IgfsControlResponse;
import org.apache.ignite.internal.igfs.common.IgfsHandshakeRequest;
import org.apache.ignite.internal.igfs.common.IgfsMessage;
import org.apache.ignite.internal.igfs.common.IgfsModeResolverRequest;
import org.apache.ignite.internal.igfs.common.IgfsPathControlRequest;
import org.apache.ignite.internal.igfs.common.IgfsStatusRequest;
import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest;
import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
import org.apache.ignite.internal.processors.igfs.IgfsInputStreamDescriptor;
import org.apache.ignite.internal.processors.igfs.IgfsModeResolver;
import org.apache.ignite.internal.processors.igfs.IgfsStatus;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.lang.IgniteClosure;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.AFFINITY;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.CLOSE;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.DELETE;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.INFO;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_FILES;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.LIST_PATHS;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.MAKE_DIRECTORIES;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_APPEND;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_CREATE;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.OPEN_READ;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.PATH_SUMMARY;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.READ_BLOCK;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.RENAME;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.SET_TIMES;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.UPDATE;
import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.WRITE_BLOCK;
/**
* Communication with external process (TCP or shmem).
*/
public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener {
/** Expected result is boolean. */
private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure();
/** Expected result is boolean. */
private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, Long> LONG_RES = createClosure();
/** Expected result is {@code IgfsFile}. */
private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure();
/** Expected result is {@code IgfsHandshakeResponse} */
private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
IgfsHandshakeResponse> HANDSHAKE_RES = createClosure();
/** Expected result is {@code IgfsStatus} */
private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsStatus> STATUS_RES =
createClosure();
/** Expected result is {@code IgfsFile}. */
private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure();
/** Expected result is {@code IgfsFile}. */
private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
Collection<IgfsFile>> FILE_COL_RES = createClosure();
/** Expected result is {@code IgfsFile}. */
private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
Collection<IgfsPath>> PATH_COL_RES = createClosure();
/** Expected result is {@code IgfsPathSummary}. */
private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES =
createClosure();
/** Expected result is {@code IgfsFile}. */
private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure();
/** Expected result is {@code IgfsFile}. */
private static final IgniteClosure<IgniteInternalFuture<IgfsMessage>,
IgfsModeResolver> MODE_RESOLVER_RES = createClosure();
/** IGFS name. */
private final String igfs;
/** The user this out proc is performing on behalf of. */
private final String userName;
/** Client log. */
private final Log log;
/** Client IO. */
private final HadoopIgfsIpcIo io;
/** Event listeners. */
private final Map<Long, HadoopIgfsStreamEventListener> lsnrs = new ConcurrentHashMap<>();
/**
* Constructor for TCP endpoint.
*
* @param host Host.
* @param port Port.
* @param igfs IGFS name.
* @param log Client logger.
* @param user User name.
* @throws IOException If failed.
*/
public HadoopIgfsOutProc(String host, int port, String igfs, Log log, String user) throws IOException {
this(host, port, igfs, false, log, user);
}
/**
* Constructor for shmem endpoint.
*
* @param port Port.
* @param igfs IGFS name.
* @param log Client logger.
* @param user User name.
* @throws IOException If failed.
*/
public HadoopIgfsOutProc(int port, String igfs, Log log, String user) throws IOException {
this(null, port, igfs, true, log, user);
}
/**
* Constructor.
*
* @param host Host.
* @param port Port.
* @param igfs IGFS name.
* @param shmem Shared memory flag.
* @param log Client logger.
* @param user User name.
* @throws IOException If failed.
*/
private HadoopIgfsOutProc(String host, int port, String igfs, boolean shmem, Log log, String user)
throws IOException {
assert host != null && !shmem || host == null && shmem :
"Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']';
String endpoint = host != null ? host + ":" + port : "shmem:" + port;
this.igfs = igfs;
this.log = log;
this.userName = IgfsUtils.fixUserName(user);
io = HadoopIgfsIpcIo.get(log, endpoint);
io.addEventListener(this);
}
/** {@inheritDoc} */
@Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException {
final IgfsHandshakeRequest req = new IgfsHandshakeRequest();
req.igfsName(igfs);
req.logDirectory(logDir);
return io.send(req).chain(HANDSHAKE_RES).get();
}
/** {@inheritDoc} */
@Override public void close(boolean force) {
assert io != null;
io.removeEventListener(this);
if (force)
io.forceClose();
else
io.release();
}
/** {@inheritDoc} */
@Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(INFO);
msg.path(path);
msg.userName(userName);
return io.send(msg).chain(FILE_RES).get();
}
/** {@inheritDoc} */
@Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(UPDATE);
msg.path(path);
msg.properties(props);
msg.userName(userName);
return io.send(msg).chain(FILE_RES).get();
}
/** {@inheritDoc} */
@Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime)
throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(SET_TIMES);
msg.path(path);
msg.accessTime(accessTime);
msg.modificationTime(modificationTime);
msg.userName(userName);
return io.send(msg).chain(BOOL_RES).get();
}
/** {@inheritDoc} */
@Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(RENAME);
msg.path(src);
msg.destinationPath(dest);
msg.userName(userName);
return io.send(msg).chain(BOOL_RES).get();
}
/** {@inheritDoc} */
@Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(DELETE);
msg.path(path);
msg.flag(recursive);
msg.userName(userName);
return io.send(msg).chain(BOOL_RES).get();
}
/** {@inheritDoc} */
@Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(AFFINITY);
msg.path(path);
msg.start(start);
msg.length(len);
msg.userName(userName);
return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
}
/** {@inheritDoc} */
@Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(PATH_SUMMARY);
msg.path(path);
msg.userName(userName);
return io.send(msg).chain(SUMMARY_RES).get();
}
/** {@inheritDoc} */
@Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(MAKE_DIRECTORIES);
msg.path(path);
msg.properties(props);
msg.userName(userName);
return io.send(msg).chain(BOOL_RES).get();
}
/** {@inheritDoc} */
@Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(LIST_FILES);
msg.path(path);
msg.userName(userName);
return io.send(msg).chain(FILE_COL_RES).get();
}
/** {@inheritDoc} */
@Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(LIST_PATHS);
msg.path(path);
msg.userName(userName);
return io.send(msg).chain(PATH_COL_RES).get();
}
/** {@inheritDoc} */
@Override public IgfsStatus fsStatus() throws IgniteCheckedException {
return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get();
}
/** {@inheritDoc} */
@Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(OPEN_READ);
msg.path(path);
msg.flag(false);
msg.userName(userName);
IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
}
/** {@inheritDoc} */
@Override public HadoopIgfsStreamDelegate open(IgfsPath path,
int seqReadsBeforePrefetch) throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(OPEN_READ);
msg.path(path);
msg.flag(true);
msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
msg.userName(userName);
IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
}
/** {@inheritDoc} */
@Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(OPEN_CREATE);
msg.path(path);
msg.flag(overwrite);
msg.colocate(colocate);
msg.properties(props);
msg.replication(replication);
msg.blockSize(blockSize);
msg.userName(userName);
Long streamId = io.send(msg).chain(LONG_RES).get();
return new HadoopIgfsStreamDelegate(this, streamId);
}
/** {@inheritDoc} */
@Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
@Nullable Map<String, String> props) throws IgniteCheckedException {
final IgfsPathControlRequest msg = new IgfsPathControlRequest();
msg.command(OPEN_APPEND);
msg.path(path);
msg.flag(create);
msg.properties(props);
msg.userName(userName);
Long streamId = io.send(msg).chain(LONG_RES).get();
return new HadoopIgfsStreamDelegate(this, streamId);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate desc, long pos, int len,
@Nullable final byte[] outBuf, final int outOff, final int outLen) {
assert len > 0;
final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
msg.command(READ_BLOCK);
msg.streamId((long) desc.target());
msg.position(pos);
msg.length(len);
try {
return io.send(msg, outBuf, outOff, outLen);
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
}
/** {@inheritDoc} */
@Override public void writeData(HadoopIgfsStreamDelegate desc, byte[] data, int off, int len)
throws IOException {
final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
msg.command(WRITE_BLOCK);
msg.streamId((long) desc.target());
msg.data(data);
msg.position(off);
msg.length(len);
try {
io.sendPlain(msg);
}
catch (IgniteCheckedException e) {
throw HadoopIgfsUtils.cast(e);
}
}
/** {@inheritDoc} */
@Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException {
// No-op.
}
/** {@inheritDoc} */
@Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException {
final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
msg.command(CLOSE);
msg.streamId((long)desc.target());
try {
io.send(msg).chain(BOOL_RES).get();
}
catch (IgniteCheckedException e) {
throw HadoopIgfsUtils.cast(e);
}
}
/** {@inheritDoc} */
@Override public void addEventListener(HadoopIgfsStreamDelegate desc,
HadoopIgfsStreamEventListener lsnr) {
long streamId = desc.target();
HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr);
assert lsnr0 == null || lsnr0 == lsnr;
if (log.isDebugEnabled())
log.debug("Added stream event listener [streamId=" + streamId + ']');
}
/** {@inheritDoc} */
@Override public void removeEventListener(HadoopIgfsStreamDelegate desc) {
long streamId = desc.target();
HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(streamId);
if (lsnr0 != null && log.isDebugEnabled())
log.debug("Removed stream event listener [streamId=" + streamId + ']');
}
/** {@inheritDoc} */
@Override public void onClose() {
for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) {
try {
lsnr.onClose();
}
catch (IgniteCheckedException e) {
log.warn("Got exception from stream event listener (will ignore): " + lsnr, e);
}
}
}
/** {@inheritDoc} */
@Override public void onError(long streamId, String errMsg) {
HadoopIgfsStreamEventListener lsnr = lsnrs.get(streamId);
if (lsnr != null)
lsnr.onError(errMsg);
else
log.warn("Received write error response for not registered output stream (will ignore) " +
"[streamId= " + streamId + ']');
}
/**
* Creates conversion closure for given type.
*
* @param <T> Type of expected result.
* @return Conversion closure.
*/
@SuppressWarnings("unchecked")
private static <T> IgniteClosure<IgniteInternalFuture<IgfsMessage>, T> createClosure() {
return new IgniteClosure<IgniteInternalFuture<IgfsMessage>, T>() {
@Override public T apply(IgniteInternalFuture<IgfsMessage> fut) {
try {
IgfsControlResponse res = (IgfsControlResponse)fut.get();
if (res.hasError())
res.throwError();
return (T)res.response();
}
catch (IgfsException | IgniteCheckedException e) {
throw new GridClosureException(e);
}
}
};
}
/** {@inheritDoc} */
@Override public String user() {
return userName;
}
/** {@inheritDoc} */
@Override public IgfsModeResolver modeResolver() throws IgniteCheckedException {
return io.send(new IgfsModeResolverRequest()).chain(MODE_RESOLVER_RES).get();
}
}