| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.amqp_1_0.client; |
| |
| import org.apache.qpid.amqp_1_0.type.*; |
| import org.apache.qpid.amqp_1_0.type.messaging.*; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Options; |
| |
| import java.io.*; |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| |
| public class Filereceiver extends Util |
| { |
| private static final String USAGE_STRING = "filereceiver [options] <address> <directory>\n\nOptions:"; |
| |
| protected Filereceiver(String[] args) |
| { |
| super(args); |
| } |
| |
| @Override |
| protected boolean hasLinkDurableOption() |
| { |
| return true; |
| } |
| |
| @Override |
| protected boolean hasLinkNameOption() |
| { |
| return true; |
| } |
| |
| @Override |
| protected boolean hasResponseQueueOption() |
| { |
| return false; |
| } |
| |
| @Override |
| protected boolean hasSizeOption() |
| { |
| return false; |
| } |
| |
| @Override |
| protected boolean hasBlockOption() |
| { |
| return true; |
| } |
| |
| @Override |
| protected boolean hasStdInOption() |
| { |
| return false; |
| } |
| |
| @Override |
| protected boolean hasTxnOption() |
| { |
| return false; |
| } |
| |
| @Override |
| protected boolean hasModeOption() |
| { |
| return false; |
| } |
| |
| @Override |
| protected boolean hasCountOption() |
| { |
| return false; |
| } |
| |
| @Override |
| protected void printUsage(Options options) |
| { |
| HelpFormatter formatter = new HelpFormatter(); |
| formatter.printHelp(USAGE_STRING, options ); |
| |
| } |
| |
| @Override |
| protected void run() |
| { |
| final String queue = getArgs()[0]; |
| final String directoryName = getArgs()[1]; |
| |
| try |
| { |
| Connection conn = newConnection(); |
| |
| Session session = conn.createSession(); |
| |
| final File directory = new File(directoryName); |
| if(directory.isDirectory() && directory.canWrite()) |
| { |
| File tmpDirectory = new File(directoryName, ".tmp"); |
| if(!tmpDirectory.exists()) |
| { |
| tmpDirectory.mkdir(); |
| } |
| |
| String[] unsettledFiles = tmpDirectory.list(); |
| |
| Map<Binary, Outcome> unsettled = new HashMap<Binary, Outcome>(); |
| final Map<Binary, String> unsettledFileNames = new HashMap<Binary, String>(); |
| |
| Accepted accepted = new Accepted(); |
| |
| for(String fileName : unsettledFiles) |
| { |
| File theFile = new File(tmpDirectory, fileName); |
| if(theFile.isFile()) |
| { |
| if(fileName.startsWith("~") && fileName.endsWith("~")) |
| { |
| theFile.delete(); |
| } |
| else |
| { |
| int splitPoint = fileName.indexOf("."); |
| String deliveryTagStr = fileName.substring(0,splitPoint); |
| String actualFileName = fileName.substring(splitPoint+1); |
| |
| byte[] bytes = new byte[deliveryTagStr.length()/2]; |
| |
| |
| for(int i = 0; i < bytes.length; i++) |
| { |
| char c = deliveryTagStr.charAt(2*i); |
| char d = deliveryTagStr.charAt(1+(2*i)); |
| |
| bytes[i] = (byte) (((c <= '9' ? c - '0' : c - 'W') << 4) |
| | (d <= '9' ? d - '0' : d - 'W')); |
| |
| } |
| Binary deliveryTag = new Binary(bytes); |
| unsettled.put(deliveryTag, accepted); |
| unsettledFileNames.put(deliveryTag, fileName); |
| } |
| } |
| |
| } |
| |
| Receiver r = session.createReceiver(queue, AcknowledgeMode.EO, getLinkName(), isDurableLink(), |
| unsettled); |
| |
| Map<Binary, Outcome> remoteUnsettled = r.getRemoteUnsettled(); |
| |
| for(Map.Entry<Binary, String> entry : unsettledFileNames.entrySet()) |
| { |
| if(remoteUnsettled == null || !remoteUnsettled.containsKey(entry.getKey())) |
| { |
| |
| File tmpFile = new File(tmpDirectory, entry.getValue()); |
| final File dest = new File(directory, |
| entry.getValue().substring(entry.getValue().indexOf(".") + 1)); |
| if(dest.exists()) |
| { |
| System.err.println("Duplicate detected - filename " + dest.getName()); |
| } |
| |
| tmpFile.renameTo(dest); |
| } |
| } |
| |
| |
| int credit = 10; |
| |
| r.setCredit(UnsignedInteger.valueOf(credit), true); |
| |
| |
| int received = 0; |
| Message m = null; |
| do |
| { |
| m = isBlock() && received == 0 ? r.receive() : r.receive(10000); |
| if(m != null) |
| { |
| if(m.isResume() && unsettled.containsKey(m.getDeliveryTag())) |
| { |
| final String tmpFileName = unsettledFileNames.get(m.getDeliveryTag()); |
| final File unsettledFile = new File(tmpDirectory, |
| tmpFileName); |
| r.acknowledge(m, new Receiver.SettledAction() |
| { |
| public void onSettled(final Binary deliveryTag) |
| { |
| int splitPoint = tmpFileName.indexOf("."); |
| |
| String fileName = tmpFileName.substring(splitPoint+1); |
| |
| final File dest = new File(directory, fileName); |
| if(dest.exists()) |
| { |
| System.err.println("Duplicate detected - filename " + dest.getName()); |
| } |
| unsettledFile.renameTo(dest); |
| unsettledFileNames.remove(deliveryTag); |
| } |
| }); |
| } |
| else |
| { |
| received++; |
| List<Section> sections = m.getPayload(); |
| Binary deliveryTag = m.getDeliveryTag(); |
| StringBuilder tagNameBuilder = new StringBuilder(); |
| |
| ByteBuffer dtbuf = deliveryTag.asByteBuffer(); |
| while(dtbuf.hasRemaining()) |
| { |
| tagNameBuilder.append(String.format("%02x", dtbuf.get())); |
| } |
| |
| |
| ApplicationProperties properties = null; |
| List<Binary> data = new ArrayList<Binary>(); |
| int totalSize = 0; |
| for(Section section : sections) |
| { |
| if(section instanceof ApplicationProperties) |
| { |
| properties = (ApplicationProperties) section; |
| } |
| else if(section instanceof AmqpValue) |
| { |
| AmqpValue value = (AmqpValue) section; |
| if(value.getValue() instanceof Binary) |
| { |
| Binary binary = (Binary) value.getValue(); |
| data.add(binary); |
| totalSize += binary.getLength(); |
| |
| } |
| else |
| { |
| // TODO exception |
| } |
| } |
| else if(section instanceof Data) |
| { |
| Data value = (Data) section; |
| Binary binary = value.getValue(); |
| data.add(binary); |
| totalSize += binary.getLength(); |
| |
| } |
| } |
| if(properties != null) |
| { |
| final String fileName = (String) properties.getValue().get("filename"); |
| byte[] fileData = new byte[totalSize]; |
| ByteBuffer buf = ByteBuffer.wrap(fileData); |
| int offset = 0; |
| for(Binary bin : data) |
| { |
| buf.put(bin.asByteBuffer()); |
| } |
| File outputFile = new File(tmpDirectory, "~"+fileName+"~"); |
| if(outputFile.exists()) |
| { |
| outputFile.delete(); |
| } |
| FileOutputStream fos = new FileOutputStream(outputFile); |
| fos.write(fileData); |
| fos.flush(); |
| fos.close(); |
| |
| final File unsettledFile = new File(tmpDirectory, tagNameBuilder.toString() + "." + |
| fileName); |
| outputFile.renameTo(unsettledFile); |
| r.acknowledge(m, new Receiver.SettledAction() |
| { |
| public void onSettled(final Binary deliveryTag) |
| { |
| final File dest = new File(directory, fileName); |
| if(dest.exists()) |
| { |
| System.err.println("Duplicate detected - filename " + dest.getName()); |
| } |
| unsettledFile.renameTo(dest); |
| |
| } |
| }); |
| |
| } |
| } |
| } |
| } |
| while(m != null); |
| |
| |
| r.close(); |
| } |
| else |
| { |
| System.err.println("No such directory: " + directoryName); |
| } |
| session.close(); |
| conn.close(); |
| } |
| catch (Connection.ConnectionException e) |
| { |
| e.printStackTrace(); |
| } |
| catch (FileNotFoundException e) |
| { |
| e.printStackTrace(); //TODO. |
| } |
| catch (IOException e) |
| { |
| e.printStackTrace(); //TODO. |
| } |
| catch (AmqpErrorException e) |
| { |
| e.printStackTrace(); //TODO. |
| } |
| |
| } |
| |
| public static void main(String[] args) |
| { |
| new Filereceiver(args).run(); |
| } |
| } |