blob: 29d743ce96e02d953b7e893a52b0212489da9379 [file] [log] [blame]
using DotPulsar.Exceptions;
using DotPulsar.Internal.Exceptions;
using DotPulsar.Internal.PulsarApi;
namespace DotPulsar.Internal.Extensions
{
public static class CommandExtensions
{
public static void Expect(this BaseCommand command, params BaseCommand.Type[] types)
{
var actual = command.CommandType;
foreach (var type in types)
{
if (type == actual)
return;
}
switch (actual)
{
case BaseCommand.Type.Error:
command.Error.Throw();
return;
case BaseCommand.Type.SendError:
command.SendError.Throw();
return;
}
throw new UnexpectedResponseException($"Expected '{string.Join(",", types)}' but got '{actual}'");
}
public static void Throw(this CommandSendError command) => Throw(command.Error, command.Message);
public static void Throw(this CommandLookupTopicResponse command) => Throw(command.Error, command.Message);
public static void Throw(this CommandError error) => Throw(error.Error, error.Message);
private static void Throw(ServerError error, string message)
{
switch (error)
{
case ServerError.AuthenticationError: throw new AuthenticationException(message);
case ServerError.AuthorizationError: throw new AuthorizationException(message);
case ServerError.ChecksumError: throw new ChecksumException(message);
case ServerError.ConsumerAssignError: throw new ConsumerAssignException(message);
case ServerError.ConsumerBusy: throw new ConsumerBusyException(message);
case ServerError.ConsumerNotFound: throw new ConsumerNotFoundException(message);
case ServerError.IncompatibleSchema: throw new IncompatibleSchemaException(message);
case ServerError.InvalidTopicName: throw new InvalidTopicNameException(message);
case ServerError.MetadataError: throw new MetadataException(message);
case ServerError.PersistenceError: throw new PersistenceException(message);
case ServerError.ProducerBlockedQuotaExceededError:
case ServerError.ProducerBlockedQuotaExceededException:
throw new ProducerBlockedQuotaExceededException(message + ". Error code: " + error);
case ServerError.ProducerBusy: throw new ProducerBusyException(message);
case ServerError.ServiceNotReady: throw new ServiceNotReadyException(message);
case ServerError.SubscriptionNotFound: throw new SubscriptionNotFoundException(message);
case ServerError.TooManyRequests: throw new TooManyRequestsException(message);
case ServerError.TopicNotFound: throw new TopicNotFoundException(message);
case ServerError.TopicTerminatedError: throw new TopicTerminatedException(message);
case ServerError.UnsupportedVersionError: throw new UnsupportedVersionException(message);
case ServerError.UnknownError:
default: throw new UnknownException(message + ". Error code: " + error);
}
}
public static BaseCommand AsBaseCommand(this CommandAck command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.Ack,
Ack = command
};
}
public static BaseCommand AsBaseCommand(this CommandConnect command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.Connect,
Connect = command
};
}
public static BaseCommand AsBaseCommand(this CommandPing command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.Ping,
Ping = command
};
}
public static BaseCommand AsBaseCommand(this CommandPong command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.Pong,
Pong = command
};
}
public static BaseCommand AsBaseCommand(this CommandProducer command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.Producer,
Producer = command
};
}
public static BaseCommand AsBaseCommand(this CommandGetLastMessageId command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.GetLastMessageId,
GetLastMessageId = command
};
}
public static BaseCommand AsBaseCommand(this CommandUnsubscribe command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.Unsubscribe,
Unsubscribe = command
};
}
public static BaseCommand AsBaseCommand(this CommandSubscribe command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.Subscribe,
Subscribe = command
};
}
public static BaseCommand AsBaseCommand(this CommandLookupTopic command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.Lookup,
LookupTopic = command
};
}
public static BaseCommand AsBaseCommand(this CommandSend command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.Send,
Send = command
};
}
public static BaseCommand AsBaseCommand(this CommandFlow command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.Flow,
Flow = command
};
}
public static BaseCommand AsBaseCommand(this CommandCloseProducer command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.CloseProducer,
CloseProducer = command
};
}
public static BaseCommand AsBaseCommand(this CommandCloseConsumer command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.CloseConsumer,
CloseConsumer = command
};
}
public static BaseCommand AsBaseCommand(this CommandSeek command)
{
return new BaseCommand
{
CommandType = BaseCommand.Type.Seek,
Seek = command
};
}
}
}