blob: 509a97327bb739999134478751f5d55757a43e49 [file] [log] [blame]
package org.apache.dubbo.sample.tri;
import org.apache.dubbo.common.stream.StreamObserver;
import java.util.function.Function;
/**
* @author earthchen
* @date 2021/9/6
**/
public class EchoStreamObserver<T, R> implements StreamObserver<T> {
private final Function<T, R> echoFunc;
private final StreamObserver<R> responseObserver;
public EchoStreamObserver(Function<T, R> echoFunc, StreamObserver<R> responseObserver) {
this.echoFunc = echoFunc;
this.responseObserver = responseObserver;
}
@Override
public void onNext(T data) {
responseObserver.onNext(echoFunc.apply(data));
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
responseObserver.onError(new IllegalStateException("Stream err"));
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
}