blob: fb72d66dd7e2bcfe9b18c79dc2830bcadf01056e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
Settings as SettingsPB,
ClientType,
Publishing,
} from '../../proto/apache/rocketmq/v2/definition_pb';
import { Endpoints } from '../route';
import { ExponentialBackoffRetryPolicy } from '../retry';
import { Settings, UserAgent } from '../client';
import { createDuration } from '../util';
export class PublishingSettings extends Settings {
readonly #topics: Set<string>;
/**
* If message body size exceeds the threshold, it would be compressed for convenience of transport.
* https://rocketmq.apache.org/docs/introduction/03limits/
* Default max message size is 4 MB
*/
#maxBodySizeBytes = 4 * 1024 * 1024;
#validateMessageType = true;
constructor(clientId: string, accessPoint: Endpoints, retryPolicy: ExponentialBackoffRetryPolicy,
requestTimeout: number, topics: Set<string>) {
super(clientId, ClientType.PRODUCER, accessPoint, requestTimeout, retryPolicy);
this.#topics = topics;
}
get maxBodySizeBytes() {
return this.#maxBodySizeBytes;
}
isValidateMessageType() {
return this.#validateMessageType;
}
toProtobuf(): SettingsPB {
const publishing = new Publishing()
.setValidateMessageType(this.#validateMessageType);
for (const topic of this.#topics) {
publishing.addTopics().setName(topic);
}
return new SettingsPB()
.setClientType(this.clientType)
.setAccessPoint(this.accessPoint.toProtobuf())
.setRequestTimeout(createDuration(this.requestTimeout))
.setPublishing(publishing)
.setUserAgent(UserAgent.INSTANCE.toProtobuf());
}
sync(settings: SettingsPB): void {
if (settings.getPubSubCase() !== SettingsPB.PubSubCase.PUBLISHING) {
// log.error("[Bug] Issued settings not match with the client type, clientId={}, pubSubCase={}, "
// + "clientType={}", clientId, pubSubCase, clientType);
return;
}
const backoffPolicy = settings.getBackoffPolicy()!;
const publishing = settings.getPublishing()!.toObject();
const exist = this.retryPolicy!;
this.retryPolicy = exist.inheritBackoff(backoffPolicy);
this.#validateMessageType = publishing.validateMessageType;
this.#maxBodySizeBytes = publishing.maxBodySize;
}
}