| package org.apache.bookkeeper.benchmark; |
| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.util.Enumeration; |
| import java.util.HashMap; |
| |
| import org.apache.bookkeeper.client.AsyncCallback.AddCallback; |
| import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; |
| import org.apache.bookkeeper.client.BKException; |
| import org.apache.bookkeeper.client.BookKeeper; |
| import org.apache.bookkeeper.client.LedgerEntry; |
| import org.apache.bookkeeper.client.LedgerHandle; |
| import org.apache.bookkeeper.client.BookKeeper.DigestType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.zookeeper.KeeperException; |
| |
| /** |
| * This is a simple test program to compare the performance of writing to |
| * BookKeeper and to the local file system. |
| * |
| */ |
| |
| public class TestClient |
| implements AddCallback, ReadCallback { |
| private static final Logger LOG = LoggerFactory.getLogger(TestClient.class); |
| |
| BookKeeper x; |
| LedgerHandle lh; |
| Integer entryId; |
| HashMap<Integer, Integer> map; |
| |
| FileOutputStream fStream; |
| FileOutputStream fStreamLocal; |
| long start, lastId; |
| |
| public TestClient() { |
| entryId = 0; |
| map = new HashMap<Integer, Integer>(); |
| } |
| |
| public TestClient(String servers) throws KeeperException, IOException, InterruptedException { |
| this(); |
| x = new BookKeeper(servers); |
| try { |
| lh = x.createLedger(DigestType.MAC, new byte[] {'a', 'b'}); |
| } catch (BKException e) { |
| LOG.error(e.toString()); |
| } |
| } |
| |
| public TestClient(String servers, int ensSize, int qSize) |
| throws KeeperException, IOException, InterruptedException { |
| this(); |
| x = new BookKeeper(servers); |
| try { |
| lh = x.createLedger(ensSize, qSize, DigestType.MAC, new byte[] {'a', 'b'}); |
| } catch (BKException e) { |
| LOG.error(e.toString()); |
| } |
| } |
| |
| public TestClient(FileOutputStream fStream) |
| throws FileNotFoundException { |
| this.fStream = fStream; |
| this.fStreamLocal = new FileOutputStream("./local.log"); |
| } |
| |
| |
| public Integer getFreshEntryId(int val) { |
| ++this.entryId; |
| synchronized (map) { |
| map.put(this.entryId, val); |
| } |
| return this.entryId; |
| } |
| |
| public boolean removeEntryId(Integer id) { |
| boolean retVal = false; |
| synchronized (map) { |
| map.remove(id); |
| retVal = true; |
| |
| if(map.size() == 0) map.notifyAll(); |
| else { |
| if(map.size() < 4) |
| LOG.error(map.toString()); |
| } |
| } |
| return retVal; |
| } |
| |
| public void closeHandle() throws KeeperException, InterruptedException, BKException { |
| lh.close(); |
| } |
| /** |
| * First says if entries should be written to BookKeeper (0) or to the local |
| * disk (1). Second parameter is an integer defining the length of a ledger entry. |
| * Third parameter is the number of writes. |
| * |
| * @param args |
| */ |
| public static void main(String[] args) { |
| |
| int lenght = Integer.parseInt(args[1]); |
| StringBuilder sb = new StringBuilder(); |
| while(lenght-- > 0) { |
| sb.append('a'); |
| } |
| |
| Integer selection = Integer.parseInt(args[0]); |
| switch(selection) { |
| case 0: |
| StringBuilder servers_sb = new StringBuilder(); |
| for (int i = 4; i < args.length; i++) { |
| servers_sb.append(args[i] + " "); |
| } |
| |
| String servers = servers_sb.toString().trim().replace(' ', ','); |
| try { |
| TestClient c = new TestClient(servers, Integer.parseInt(args[3]), Integer.parseInt(args[4])); |
| c.writeSameEntryBatch(sb.toString().getBytes(), Integer.parseInt(args[2])); |
| //c.writeConsecutiveEntriesBatch(Integer.parseInt(args[0])); |
| c.closeHandle(); |
| } catch (Exception e) { |
| LOG.error("Exception occurred", e); |
| } |
| break; |
| case 1: |
| |
| try { |
| TestClient c = new TestClient(new FileOutputStream(args[2])); |
| c.writeSameEntryBatchFS(sb.toString().getBytes(), Integer.parseInt(args[3])); |
| } catch(FileNotFoundException e) { |
| LOG.error("File not found", e); |
| } |
| break; |
| case 2: |
| break; |
| } |
| } |
| |
| void writeSameEntryBatch(byte[] data, int times) throws InterruptedException { |
| start = System.currentTimeMillis(); |
| int count = times; |
| LOG.debug("Data: " + new String(data) + ", " + data.length); |
| while(count-- > 0) { |
| lh.asyncAddEntry(data, this, this.getFreshEntryId(2)); |
| } |
| LOG.debug("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start)); |
| synchronized (map) { |
| if(map.size() != 0) |
| map.wait(); |
| } |
| LOG.debug("Finished processing in ms: " + (System.currentTimeMillis() - start)); |
| |
| LOG.debug("Ended computation"); |
| } |
| |
| void writeConsecutiveEntriesBatch(int times) throws InterruptedException { |
| start = System.currentTimeMillis(); |
| int count = times; |
| while(count-- > 0) { |
| byte[] write = new byte[2]; |
| int j = count%100; |
| int k = (count+1)%100; |
| write[0] = (byte) j; |
| write[1] = (byte) k; |
| lh.asyncAddEntry(write, this, this.getFreshEntryId(2)); |
| } |
| LOG.debug("Finished " + times + " async writes in ms: " + (System.currentTimeMillis() - start)); |
| synchronized (map) { |
| if(map.size() != 0) |
| map.wait(); |
| } |
| LOG.debug("Finished processing writes (ms): " + (System.currentTimeMillis() - start)); |
| |
| Integer mon = Integer.valueOf(0); |
| synchronized(mon) { |
| lh.asyncReadEntries(1, times - 1, this, mon); |
| mon.wait(); |
| } |
| LOG.error("Ended computation"); |
| } |
| |
| void writeSameEntryBatchFS(byte[] data, int times) { |
| int count = times; |
| LOG.debug("Data: " + data.length + ", " + times); |
| try { |
| start = System.currentTimeMillis(); |
| while(count-- > 0) { |
| fStream.write(data); |
| fStreamLocal.write(data); |
| fStream.flush(); |
| } |
| fStream.close(); |
| System.out.println("Finished processing writes (ms): " + (System.currentTimeMillis() - start)); |
| } catch(IOException e) { |
| LOG.error("IOException occurred", e); |
| } |
| } |
| |
| |
| @Override |
| public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { |
| this.removeEntryId((Integer) ctx); |
| } |
| |
| @Override |
| public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) { |
| System.out.println("Read callback: " + rc); |
| while(seq.hasMoreElements()) { |
| LedgerEntry le = seq.nextElement(); |
| LOG.debug(new String(le.getEntry())); |
| } |
| synchronized(ctx) { |
| ctx.notify(); |
| } |
| } |
| } |