| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.twitter.distributedlog.util; |
| |
| import com.google.common.base.Objects; |
| import com.twitter.distributedlog.DistributedLogConstants; |
| import com.twitter.distributedlog.LogSegmentMetadata; |
| import com.twitter.distributedlog.exceptions.InvalidStreamNameException; |
| import com.twitter.distributedlog.exceptions.UnexpectedException; |
| import org.apache.commons.lang.StringUtils; |
| |
| import java.net.InetAddress; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.List; |
| |
| import static com.google.common.base.Charsets.UTF_8; |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| /** |
| * Utilities about DL implementations like uri, log segments, metadata serialization and deserialization. |
| */ |
| public class DLUtils { |
| |
| /** |
| * Find the log segment whose transaction ids are not less than provided <code>transactionId</code>. |
| * |
| * @param segments |
| * segments to search |
| * @param transactionId |
| * transaction id to find |
| * @return the first log segment whose transaction ids are not less than <code>transactionId</code>. |
| */ |
| public static int findLogSegmentNotLessThanTxnId(List<LogSegmentMetadata> segments, |
| long transactionId) { |
| int found = -1; |
| for (int i = segments.size() - 1; i >= 0; i--) { |
| LogSegmentMetadata segment = segments.get(i); |
| if (segment.getFirstTxId() <= transactionId) { |
| found = i; |
| break; |
| } |
| } |
| if (found <= -1) { |
| return -1; |
| } |
| if (found == 0 && segments.get(0).getFirstTxId() == transactionId) { |
| return 0; |
| } |
| LogSegmentMetadata foundSegment = segments.get(found); |
| if (foundSegment.getFirstTxId() == transactionId) { |
| for (int i = found - 1; i >= 0; i--) { |
| LogSegmentMetadata segment = segments.get(i); |
| if (segment.isInProgress()) { |
| break; |
| } |
| if (segment.getLastTxId() < transactionId) { |
| break; |
| } |
| found = i; |
| } |
| return found; |
| } else { |
| if (foundSegment.isInProgress() |
| || found == segments.size() - 1) { |
| return found; |
| } |
| if (foundSegment.getLastTxId() >= transactionId) { |
| return found; |
| } |
| return found + 1; |
| } |
| } |
| |
| /** |
| * Assign next log segment sequence number based on a decreasing list of log segments. |
| * |
| * @param segmentListDesc |
| * a decreasing list of log segments |
| * @return null if no log segments was assigned a sequence number in <code>segmentListDesc</code>. |
| * otherwise, return next log segment sequence number |
| */ |
| public static Long nextLogSegmentSequenceNumber(List<LogSegmentMetadata> segmentListDesc) { |
| int lastAssignedLogSegmentIdx = -1; |
| Long lastAssignedLogSegmentSeqNo = null; |
| Long nextLogSegmentSeqNo = null; |
| |
| for (int i = 0; i < segmentListDesc.size(); i++) { |
| LogSegmentMetadata metadata = segmentListDesc.get(i); |
| if (LogSegmentMetadata.supportsLogSegmentSequenceNo(metadata.getVersion())) { |
| lastAssignedLogSegmentSeqNo = metadata.getLogSegmentSequenceNumber(); |
| lastAssignedLogSegmentIdx = i; |
| break; |
| } |
| } |
| |
| if (null != lastAssignedLogSegmentSeqNo) { |
| // latest log segment is assigned with a sequence number, start with next sequence number |
| nextLogSegmentSeqNo = lastAssignedLogSegmentSeqNo + lastAssignedLogSegmentIdx + 1; |
| } |
| return nextLogSegmentSeqNo; |
| } |
| |
| /** |
| * Compute the start sequence id for <code>segment</code>, based on previous segment list |
| * <code>segmentListDesc</code>. |
| * |
| * @param logSegmentDescList |
| * list of segments in descending order |
| * @param segment |
| * segment to compute start sequence id for |
| * @return start sequence id |
| */ |
| public static long computeStartSequenceId(List<LogSegmentMetadata> logSegmentDescList, |
| LogSegmentMetadata segment) |
| throws UnexpectedException { |
| long startSequenceId = 0L; |
| for (LogSegmentMetadata metadata : logSegmentDescList) { |
| if (metadata.getLogSegmentSequenceNumber() >= segment.getLogSegmentSequenceNumber()) { |
| continue; |
| } else if (metadata.getLogSegmentSequenceNumber() < (segment.getLogSegmentSequenceNumber() - 1)) { |
| break; |
| } |
| if (metadata.isInProgress()) { |
| throw new UnexpectedException("Should not complete log segment " + segment.getLogSegmentSequenceNumber() |
| + " since it's previous log segment is still inprogress : " + logSegmentDescList); |
| } |
| if (metadata.supportsSequenceId()) { |
| startSequenceId = metadata.getStartSequenceId() + metadata.getRecordCount(); |
| } |
| } |
| return startSequenceId; |
| } |
| |
| /** |
| * Deserialize log segment sequence number for bytes <code>data</code>. |
| * |
| * @param data |
| * byte representation of log segment sequence number |
| * @return log segment sequence number |
| * @throws NumberFormatException if the bytes aren't valid |
| */ |
| public static long deserializeLogSegmentSequenceNumber(byte[] data) { |
| String seqNoStr = new String(data, UTF_8); |
| return Long.parseLong(seqNoStr); |
| } |
| |
| /** |
| * Serilize log segment sequence number <code>logSegmentSeqNo</code> into bytes. |
| * |
| * @param logSegmentSeqNo |
| * log segment sequence number |
| * @return byte representation of log segment sequence number |
| */ |
| public static byte[] serializeLogSegmentSequenceNumber(long logSegmentSeqNo) { |
| return Long.toString(logSegmentSeqNo).getBytes(UTF_8); |
| } |
| |
| /** |
| * Deserialize log record transaction id for bytes <code>data</code>. |
| * |
| * @param data |
| * byte representation of log record transaction id |
| * @return log record transaction id |
| * @throws NumberFormatException if the bytes aren't valid |
| */ |
| public static long deserializeTransactionId(byte[] data) { |
| String seqNoStr = new String(data, UTF_8); |
| return Long.parseLong(seqNoStr); |
| } |
| |
| /** |
| * Serilize log record transaction id <code>transactionId</code> into bytes. |
| * |
| * @param transactionId |
| * log record transaction id |
| * @return byte representation of log record transaction id. |
| */ |
| public static byte[] serializeTransactionId(long transactionId) { |
| return Long.toString(transactionId).getBytes(UTF_8); |
| } |
| |
| /** |
| * Serialize log segment id into bytes. |
| * |
| * @param logSegmentId |
| * log segment id |
| * @return bytes representation of log segment id |
| */ |
| public static byte[] logSegmentId2Bytes(long logSegmentId) { |
| return Long.toString(logSegmentId).getBytes(UTF_8); |
| } |
| |
| /** |
| * Deserialize bytes into log segment id. |
| * |
| * @param data |
| * bytes representation of log segment id |
| * @return log segment id |
| */ |
| public static long bytes2LogSegmentId(byte[] data) { |
| return Long.parseLong(new String(data, UTF_8)); |
| } |
| |
| /** |
| * Normalize the uri. |
| * |
| * @param uri the distributedlog uri. |
| * @return the normalized uri |
| */ |
| public static URI normalizeURI(URI uri) { |
| checkNotNull(uri, "DistributedLog uri is null"); |
| String scheme = uri.getScheme(); |
| checkNotNull(scheme, "Invalid distributedlog uri : " + uri); |
| scheme = scheme.toLowerCase(); |
| String[] schemeParts = StringUtils.split(scheme, '-'); |
| checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()), |
| "Unknown distributedlog scheme found : " + uri); |
| URI normalizedUri; |
| try { |
| normalizedUri = new URI( |
| schemeParts[0], // remove backend info |
| uri.getAuthority(), |
| uri.getPath(), |
| uri.getQuery(), |
| uri.getFragment()); |
| } catch (URISyntaxException e) { |
| throw new IllegalArgumentException("Invalid distributedlog uri found : " + uri, e); |
| } |
| return normalizedUri; |
| } |
| |
| private static String getHostIpLockClientId() { |
| try { |
| return InetAddress.getLocalHost().toString(); |
| } catch(Exception ex) { |
| return DistributedLogConstants.UNKNOWN_CLIENT_ID; |
| } |
| } |
| |
| /** |
| * Normalize the client id. |
| * |
| * @return the normalized client id. |
| */ |
| public static String normalizeClientId(String clientId) { |
| String normalizedClientId; |
| if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) { |
| normalizedClientId = getHostIpLockClientId(); |
| } else { |
| normalizedClientId = clientId; |
| } |
| return normalizedClientId; |
| } |
| |
| /** |
| * Is it a reserved stream name in bkdl namespace? |
| * |
| * @param name |
| * stream name |
| * @return true if it is reserved name, otherwise false. |
| */ |
| public static boolean isReservedStreamName(String name) { |
| return name.startsWith("."); |
| } |
| |
| /** |
| * Validate the stream name. |
| * |
| * @param nameOfStream |
| * name of stream |
| * @throws InvalidStreamNameException |
| */ |
| public static void validateName(String nameOfStream) |
| throws InvalidStreamNameException { |
| String reason = null; |
| char chars[] = nameOfStream.toCharArray(); |
| char c; |
| // validate the stream to see if meet zookeeper path's requirement |
| for (int i = 0; i < chars.length; i++) { |
| c = chars[i]; |
| |
| if (c == 0) { |
| reason = "null character not allowed @" + i; |
| break; |
| } else if (c == '/') { |
| reason = "'/' not allowed @" + i; |
| break; |
| } else if (c > '\u0000' && c < '\u001f' |
| || c > '\u007f' && c < '\u009F' |
| || c > '\ud800' && c < '\uf8ff' |
| || c > '\ufff0' && c < '\uffff') { |
| reason = "invalid charater @" + i; |
| break; |
| } |
| } |
| if (null != reason) { |
| throw new InvalidStreamNameException(nameOfStream, reason); |
| } |
| if (isReservedStreamName(nameOfStream)) { |
| throw new InvalidStreamNameException(nameOfStream, |
| "Stream Name is reserved"); |
| } |
| } |
| } |