blob: 13cb6984fafe10174368f51cc9fcce3550248cd3 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.dubbo.samples.impl;
import java.util.HashMap;
import java.util.Map;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.config.annotation.DubboService;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.protocol.tri.ServerStreamObserver;
import org.apache.dubbo.samples.DubboGreeterTriple;
import org.apache.dubbo.samples.GreeterReply;
import org.apache.dubbo.samples.GreeterRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author conghuhu
*/
@DubboService(version = "1.0.0")
public class GreeterImpl extends DubboGreeterTriple.GreeterImplBase {
private static final Logger LOGGER = LoggerFactory.getLogger(GreeterImpl.class);
private final String serverName;
public static final Map<String, Boolean> cancelResultMap = new HashMap<>();
public GreeterImpl() {
this.serverName = "test dubbo tri mesh";
}
public GreeterImpl(String serverName) {
this.serverName = serverName;
}
@Override
public GreeterReply greet(GreeterRequest request) {
LOGGER.info("Server {} received greet request {}", serverName, request);
boolean isProviderSide = RpcContext.getContext().isProviderSide();
String clientIP = RpcContext.getContext().getRemoteHost();
String remoteApplication = RpcContext.getContext().getRemoteApplicationName();
String application = RpcContext.getContext().getUrl().getParameter("application");
String protocol = RpcContext.getContext().getProtocol();
return GreeterReply.newBuilder()
.setMessage("hello," + request.getName() + ", response from provider-v1: " + RpcContext.getContext().getLocalAddress() +
", client: " + clientIP + ", local: " + application + ", remote: " + remoteApplication +
", isProviderSide: " + isProviderSide)
.build();
}
@Override
public GreeterReply greetWithAttachment(GreeterRequest request) {
final String key = "user-attachment";
final String value = "hello," + RpcContext.getServerAttachment().getAttachment(key);
RpcContext.getServerContext().setObjectAttachment(key, value);
return GreeterReply.newBuilder().setMessage("hello," + request.getName()).build();
}
@Override
public GreeterReply greetReturnBigAttachment(GreeterRequest request) {
final String key = "user-attachment";
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 8000; i++) {
stringBuilder.append(i);
}
String value = stringBuilder.toString();
RpcContext.getServerContext().setObjectAttachment(key, value);
return GreeterReply.newBuilder().setMessage("hello," + request.getName()).build();
}
@Override
public void cancelServerStream(GreeterRequest request,
StreamObserver<GreeterReply> replyStream) {
RpcContext.getCancellationContext().addListener(context -> {
LOGGER.info("cancel--cancelServerStream");
cancelResultMap.put("cancelServerStream", true);
});
for (int i = 0; i < 10; i++) {
replyStream.onNext(GreeterReply.newBuilder()
.setMessage(request.getName() + "--" + i)
.build());
}
}
@Override
public StreamObserver<GreeterRequest> cancelBiStream(StreamObserver<GreeterReply> replyStream) {
RpcContext.getCancellationContext()
.addListener(context -> {
LOGGER.info("cancel--cancelBiStream");
cancelResultMap.put("cancelBiStream", true);
});
return new StreamObserver<GreeterRequest>() {
@Override
public void onNext(GreeterRequest data) {
LOGGER.info("Bi-Stream-Request:" + data.getName());
replyStream.onNext(GreeterReply.newBuilder()
.setMessage(data.getName())
.build());
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
replyStream.onError(new IllegalStateException("Stream err"));
}
@Override
public void onCompleted() {
// replyStream.onCompleted();
}
};
}
@Override
public StreamObserver<GreeterRequest> cancelBiStream2(
StreamObserver<GreeterReply> replyStream) {
RpcContext.getCancellationContext()
.addListener(context -> {
LOGGER.info("cancel--cancelBiStream2");
cancelResultMap.put("cancelBiStream2", true);
});
return new StreamObserver<GreeterRequest>() {
@Override
public void onNext(GreeterRequest data) {
LOGGER.info("Bi-Stream-Request:" + data.getName());
replyStream.onNext(GreeterReply.newBuilder()
.setMessage(data.getName())
.build());
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
replyStream.onError(new IllegalStateException("Stream err"));
}
@Override
public void onCompleted() {
// replyStream.onCompleted();
}
};
}
@Override
public StreamObserver<GreeterRequest> compressorBiStream(
StreamObserver<GreeterReply> replyStream) {
ServerStreamObserver<GreeterReply> replyServerStreamObserver = (ServerStreamObserver<GreeterReply>) replyStream;
replyServerStreamObserver.setCompression("gzip");
return getGreeterRequestStreamObserver(replyServerStreamObserver);
}
private StreamObserver<GreeterRequest> getGreeterRequestStreamObserver(
StreamObserver<GreeterReply> streamObserver) {
return new StreamObserver<GreeterRequest>() {
@Override
public void onNext(GreeterRequest data) {
streamObserver.onNext(GreeterReply.newBuilder()
.setMessage(data.getName())
.build());
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
streamObserver.onError(new IllegalStateException("Stream err"));
}
@Override
public void onCompleted() {
streamObserver.onCompleted();
}
};
}
@Override
public StreamObserver<GreeterRequest> clientCompressorBiStream(
StreamObserver<GreeterReply> replyStream) {
ServerStreamObserver<GreeterReply> replyServerStreamObserver = (ServerStreamObserver<GreeterReply>) replyStream;
return getGreeterRequestStreamObserver(replyServerStreamObserver);
}
@Override
public StreamObserver<GreeterRequest> serverCompressorBiStream(
StreamObserver<GreeterReply> replyStream) {
ServerStreamObserver<GreeterReply> replyServerStreamObserver = (ServerStreamObserver<GreeterReply>) replyStream;
replyServerStreamObserver.setCompression("gzip");
return getGreeterRequestStreamObserver(replyServerStreamObserver);
}
@Override
public GreeterReply queryCancelResult(GreeterRequest request) {
boolean canceled = cancelResultMap.getOrDefault(request.getName(), false);
return GreeterReply.newBuilder()
.setMessage(String.valueOf(canceled))
.build();
}
public GreeterReply greetException(GreeterRequest request) {
RpcContext.getServerContext().setAttachment("str", "str")
.setAttachment("integer", 1)
.setAttachment("raw", new byte[] {1, 2, 3, 4});
throw new RuntimeException("Biz Exception");
}
@Override
public StreamObserver<GreeterRequest> greetStream(StreamObserver<GreeterReply> replyStream) {
return new StreamObserver<GreeterRequest>() {
int n = 0;
@Override
public void onNext(GreeterRequest data) {
n++;
LOGGER.info(data.getName() + " " + n);
replyStream.onNext(GreeterReply.newBuilder()
.setMessage(data.getName() + " " + n)
.build());
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
replyStream.onError(new IllegalStateException("Stream err"));
}
@Override
public void onCompleted() {
LOGGER.info("[greetStream] onCompleted");
replyStream.onCompleted();
}
};
}
@Override
public void greetServerStream(GreeterRequest request,
StreamObserver<GreeterReply> replyStream) {
for (int i = 0; i < 10; i++) {
replyStream.onNext(GreeterReply.newBuilder()
.setMessage(request.getName() + "--" + i)
.build());
}
replyStream.onCompleted();
}
}