Compare commits

..

18 Commits
v0.43 ... v0.49

Author SHA1 Message Date
Moxie Marlinspike
c6810d7460 Bump version to 0.49
// FREEBIE
2015-05-13 14:31:38 -07:00
Moxie Marlinspike
4c1e7e7c2f Rate limit messages on source+destination rather than just src.
// FREEBIE
2015-04-24 16:25:59 -07:00
Moxie Marlinspike
931081752a Updated sample config
// FREEBIE

Closes #38
2015-04-21 19:45:31 -07:00
Moxie Marlinspike
424e98e67e Bump version to 0.48
// FREEBIE
2015-04-21 19:44:20 -07:00
Moxie Marlinspike
7cfa93f5f8 Tone down websocket logging for bad federated responses.
// FREEBIE
2015-04-21 19:44:02 -07:00
Moxie Marlinspike
fd8e8d1475 Catch WebApplicationException inside WebsocketConnection.
// FREEBIE
2015-04-16 11:33:16 -07:00
Moxie Marlinspike
7ed5eb22ec Additional WebsocketConnection test.
// FREEBIE
2015-04-16 10:45:24 -07:00
Moxie Marlinspike
fa1c275904 Bump version to 0.46
// FREEBIE
2015-04-15 16:44:22 -07:00
Moxie Marlinspike
558c72bbb7 Make pending messages indexable by sender and timestamp.
Rather than just timestamp.

// FREEBIE
2015-04-15 16:43:44 -07:00
Moxie Marlinspike
37976455bc Bump version to 0.45
// FREEBIE
2015-04-15 16:20:25 -07:00
Moxie Marlinspike
db6ee8f687 Make stored messages REST accessible.
Add REST endpoints for retrieving and acknowledging pending
messges, beyond the WebSocket.

// FREEBIE
2015-04-15 16:19:07 -07:00
Moxie Marlinspike
53e7ffa311 Bump version to 0.44 2015-03-30 10:31:35 -07:00
Moxie Marlinspike
e0f7ff325a Add voip push support in communication with push server.
// FREEBIE
2015-03-25 15:59:52 -07:00
Moxie Marlinspike
1fcd1e33c5 Log keepalives for unsubscribed channels.
// FREEBIE
2015-03-24 14:13:32 -07:00
Moxie Marlinspike
843b16c1f0 Correctly serialize provisioning addresses.
// FREEBIE
2015-03-24 12:10:59 -07:00
Moxie Marlinspike
a58f3f0fe3 Check subscription status on websocket keepalive.
// FREEBIE
2015-03-24 10:48:14 -07:00
Moxie Marlinspike
e69e395b25 Support for receiving "canonical id" update events from pushserver.
// FREEBIE
2015-03-24 10:47:45 -07:00
Moxie Marlinspike
456164fc24 Support registering a 'voip' APN ID.
// FREEBIE
2015-03-24 10:47:20 -07:00
30 changed files with 656 additions and 253 deletions

View File

@@ -1,32 +1,16 @@
twilio:
accountId:
twilio: # Twilio SMS gateway configuration
accountId:
accountToken:
number:
localDomain: # The domain Twilio can call back to.
international: # Boolean specifying Twilio for international delivery
# Optional. If specified, Nexmo will be used for non-US SMS and
# voice verification if twilio.international is false. Otherwise,
# Nexmo, if specified, Nexmo will only be used as a fallback
# for failed Twilio deliveries.
nexmo:
apiKey:
apiSecret:
number:
push: # GCM/APN push server configuration
host:
port:
username:
password:
gcm:
senderId:
apiKey:
# Optional. Only if iOS clients are supported.
apn:
# In PEM format.
certificate:
# In PEM format.
key:
s3:
s3: # AWS S3 configuration
accessKey:
accessSecret:
@@ -35,13 +19,37 @@ s3:
# correct permissions.
attachmentsBucket:
memcache:
servers:
user:
password:
directory: # Redis server configuration for TS directory
url:
redis:
url:
cache: # Redis server configuration for general purpose caching
url:
websocket:
enabled: true
messageStore: # Postgres database configuration for message store
driverClass: org.postgresql.Driver
user:
password:
url:
database: # Postgres database configuration for account store
# the name of your JDBC driver
driverClass: org.postgresql.Driver
# the username
user:
# the password
password:
# the JDBC URL
url: jdbc:postgresql://somehost:somport/somedb
# any properties specific to your JDBC driver:
properties:
charSet: UTF-8
federation:
name:
@@ -52,24 +60,3 @@ federation:
authenticationToken: foo
certificate: in pem format
# Optional address of graphite server to report metrics
graphite:
host:
port:
database:
# the name of your JDBC driver
driverClass: org.postgresql.Driver
# the username
user:
# the password
password:
# the JDBC URL
url: jdbc:postgresql://somehost:somport/somedb
# any properties specific to your JDBC driver:
properties:
charSet: UTF-8

View File

@@ -9,7 +9,7 @@
<groupId>org.whispersystems.textsecure</groupId>
<artifactId>TextSecureServer</artifactId>
<version>0.43</version>
<version>0.49</version>
<properties>
<dropwizard.version>0.7.1</dropwizard.version>

View File

@@ -75,6 +75,10 @@ public class DispatchManager extends Thread {
}
}
public boolean hasSubscription(String name) {
return subscriptions.containsKey(name);
}
@Override
public void run() {
while (running) {

View File

@@ -58,6 +58,7 @@ import org.whispersystems.textsecuregcm.providers.TimeProvider;
import org.whispersystems.textsecuregcm.push.FeedbackHandler;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.PushServiceClient;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.push.WebsocketSender;
import org.whispersystems.textsecuregcm.sms.NexmoSmsSender;
import org.whispersystems.textsecuregcm.sms.SmsSender;
@@ -174,6 +175,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
SmsSender smsSender = new SmsSender(twilioSmsSender, nexmoSmsSender, config.getTwilioConfiguration().isInternational());
UrlSigner urlSigner = new UrlSigner(config.getS3Configuration());
PushSender pushSender = new PushSender(pushServiceClient, websocketSender);
ReceiptSender receiptSender = new ReceiptSender(accountsManager, pushSender, federatedClientManager);
FeedbackHandler feedbackHandler = new FeedbackHandler(pushServiceClient, accountsManager);
Optional<byte[]> authorizationKey = config.getRedphoneConfiguration().getAuthorizationKey();
@@ -183,7 +185,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
AttachmentController attachmentController = new AttachmentController(rateLimiters, federatedClientManager, urlSigner);
KeysControllerV1 keysControllerV1 = new KeysControllerV1(rateLimiters, keys, accountsManager, federatedClientManager);
KeysControllerV2 keysControllerV2 = new KeysControllerV2(rateLimiters, keys, accountsManager, federatedClientManager);
MessageController messageController = new MessageController(rateLimiters, pushSender, accountsManager, federatedClientManager);
MessageController messageController = new MessageController(rateLimiters, pushSender, receiptSender, accountsManager, messagesManager, federatedClientManager);
environment.jersey().register(new MultiBasicAuthProvider<>(new FederatedPeerAuthenticator(config.getFederationConfiguration()),
FederatedPeer.class,
@@ -195,7 +197,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
environment.jersey().register(new DirectoryController(rateLimiters, directory));
environment.jersey().register(new FederationControllerV1(accountsManager, attachmentController, messageController, keysControllerV1));
environment.jersey().register(new FederationControllerV2(accountsManager, attachmentController, messageController, keysControllerV2));
environment.jersey().register(new ReceiptController(accountsManager, federatedClientManager, pushSender));
environment.jersey().register(new ReceiptController(receiptSender));
environment.jersey().register(new ProvisioningController(rateLimiters, pushSender));
environment.jersey().register(attachmentController);
environment.jersey().register(keysControllerV1);
@@ -205,12 +207,12 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
if (config.getWebsocketConfiguration().isEnabled()) {
WebSocketEnvironment webSocketEnvironment = new WebSocketEnvironment(environment, config, 90000);
webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(deviceAuthenticator));
webSocketEnvironment.setConnectListener(new AuthenticatedConnectListener(accountsManager, pushSender, messagesManager, pubSubManager));
webSocketEnvironment.jersey().register(new KeepAliveController());
webSocketEnvironment.setConnectListener(new AuthenticatedConnectListener(accountsManager, pushSender, receiptSender, messagesManager, pubSubManager));
webSocketEnvironment.jersey().register(new KeepAliveController(pubSubManager));
WebSocketEnvironment provisioningEnvironment = new WebSocketEnvironment(environment, config);
provisioningEnvironment.setConnectListener(new ProvisioningConnectListener(pubSubManager));
provisioningEnvironment.jersey().register(new KeepAliveController());
provisioningEnvironment.jersey().register(new KeepAliveController(pubSubManager));
WebSocketResourceProviderFactory webSocketServlet = new WebSocketResourceProviderFactory(webSocketEnvironment );
WebSocketResourceProviderFactory provisioningServlet = new WebSocketResourceProviderFactory(provisioningEnvironment);

View File

@@ -200,6 +200,7 @@ public class AccountController {
public void setGcmRegistrationId(@Auth Account account, @Valid GcmRegistrationId registrationId) {
Device device = account.getAuthenticatedDevice().get();
device.setApnId(null);
device.setVoipApnId(null);
device.setGcmId(registrationId.getGcmRegistrationId());
if (registrationId.isWebSocketChannel()) device.setFetchesMessages(true);
@@ -225,6 +226,7 @@ public class AccountController {
public void setApnRegistrationId(@Auth Account account, @Valid ApnRegistrationId registrationId) {
Device device = account.getAuthenticatedDevice().get();
device.setApnId(registrationId.getApnRegistrationId());
device.setVoipApnId(registrationId.getVoipRegistrationId());
device.setGcmId(null);
device.setFetchesMessages(true);
accounts.update(account);

View File

@@ -1,19 +1,47 @@
package org.whispersystems.textsecuregcm.controllers;
import com.codahale.metrics.annotation.Timed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.PubSubManager;
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
import org.whispersystems.websocket.session.WebSocketSession;
import org.whispersystems.websocket.session.WebSocketSessionContext;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import io.dropwizard.auth.Auth;
@Path("/v1/keepalive")
public class KeepAliveController {
private final Logger logger = LoggerFactory.getLogger(KeepAliveController.class);
private final PubSubManager pubSubManager;
public KeepAliveController(PubSubManager pubSubManager) {
this.pubSubManager = pubSubManager;
}
@Timed
@GET
public Response getKeepAlive() {
public Response getKeepAlive(@Auth(required = false) Account account,
@WebSocketSession WebSocketSessionContext context)
{
if (account != null) {
WebsocketAddress address = new WebsocketAddress(account.getNumber(),
account.getAuthenticatedDevice().get().getId());
if (!pubSubManager.hasLocalSubscription(address)) {
logger.warn("***** No local subscription found for: " + address);
context.getClient().close(1000, "OK");
}
}
return Response.ok().build();
}

View File

@@ -26,6 +26,8 @@ import org.whispersystems.textsecuregcm.entities.IncomingMessageList;
import org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
import org.whispersystems.textsecuregcm.entities.MessageResponse;
import org.whispersystems.textsecuregcm.entities.MismatchedDevices;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
import org.whispersystems.textsecuregcm.entities.SendMessageResponse;
import org.whispersystems.textsecuregcm.entities.StaleDevices;
import org.whispersystems.textsecuregcm.federation.FederatedClient;
@@ -34,15 +36,19 @@ import org.whispersystems.textsecuregcm.federation.NoSuchPeerException;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.Base64;
import org.whispersystems.textsecuregcm.util.Util;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
@@ -66,17 +72,23 @@ public class MessageController {
private final RateLimiters rateLimiters;
private final PushSender pushSender;
private final ReceiptSender receiptSender;
private final FederatedClientManager federatedClientManager;
private final AccountsManager accountsManager;
private final MessagesManager messagesManager;
public MessageController(RateLimiters rateLimiters,
PushSender pushSender,
ReceiptSender receiptSender,
AccountsManager accountsManager,
MessagesManager messagesManager,
FederatedClientManager federatedClientManager)
{
this.rateLimiters = rateLimiters;
this.pushSender = pushSender;
this.receiptSender = receiptSender;
this.accountsManager = accountsManager;
this.messagesManager = messagesManager;
this.federatedClientManager = federatedClientManager;
}
@@ -90,7 +102,7 @@ public class MessageController {
@Valid IncomingMessageList messages)
throws IOException, RateLimitExceededException
{
rateLimiters.getMessagesLimiter().validate(source.getNumber());
rateLimiters.getMessagesLimiter().validate(source.getNumber() + "__" + destinationName);
try {
boolean isSyncMessage = source.getNumber().equals(destinationName);
@@ -137,6 +149,38 @@ public class MessageController {
}
}
@Timed
@GET
@Produces(MediaType.APPLICATION_JSON)
public OutgoingMessageEntityList getPendingMessages(@Auth Account account) {
return new OutgoingMessageEntityList(messagesManager.getMessagesForDevice(account.getNumber(),
account.getAuthenticatedDevice()
.get().getId()));
}
@Timed
@DELETE
@Path("/{source}/{timestamp}")
public void removePendingMessage(@Auth Account account,
@PathParam("source") String source,
@PathParam("timestamp") long timestamp)
throws IOException
{
try {
Optional<OutgoingMessageEntity> message = messagesManager.delete(account.getNumber(), source, timestamp);
if (message.isPresent() && message.get().getType() != OutgoingMessageSignal.Type.RECEIPT_VALUE) {
receiptSender.sendReceipt(account,
message.get().getSource(),
message.get().getTimestamp(),
Optional.fromNullable(message.get().getRelay()));
}
} catch (NoSuchUserException | NotPushRegisteredException | TransientPushFailureException e) {
logger.warn("Sending delivery receipt", e);
}
}
private void sendLocalMessage(Account source,
String destinationName,
IncomingMessageList messages,

View File

@@ -46,7 +46,7 @@ public class ProvisioningController {
{
rateLimiters.getMessagesLimiter().validate(source.getNumber());
if (!websocketSender.sendProvisioningMessage(new ProvisioningAddress(destinationName),
if (!websocketSender.sendProvisioningMessage(new ProvisioningAddress(destinationName, 0),
Base64.decode(message.getBody())))
{
throw new WebApplicationException(Response.Status.NOT_FOUND);

View File

@@ -2,14 +2,10 @@ package org.whispersystems.textsecuregcm.controllers;
import com.codahale.metrics.annotation.Timed;
import com.google.common.base.Optional;
import org.whispersystems.textsecuregcm.federation.FederatedClientManager;
import org.whispersystems.textsecuregcm.federation.NoSuchPeerException;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
@@ -18,25 +14,16 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Set;
import io.dropwizard.auth.Auth;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
@Path("/v1/receipt")
public class ReceiptController {
private final AccountsManager accountManager;
private final PushSender pushSender;
private final FederatedClientManager federatedClientManager;
private final ReceiptSender receiptSender;
public ReceiptController(AccountsManager accountManager,
FederatedClientManager federatedClientManager,
PushSender pushSender)
{
this.accountManager = accountManager;
this.federatedClientManager = federatedClientManager;
this.pushSender = pushSender;
public ReceiptController(ReceiptSender receiptSender) {
this.receiptSender = receiptSender;
}
@Timed
@@ -49,11 +36,7 @@ public class ReceiptController {
throws IOException
{
try {
if (relay.isPresent() && !relay.get().isEmpty()) {
sendRelayedReceipt(source, destination, messageId, relay.get());
} else {
sendDirectReceipt(source, destination, messageId);
}
receiptSender.sendReceipt(source, destination, messageId, relay);
} catch (NoSuchUserException | NotPushRegisteredException e) {
throw new WebApplicationException(Response.Status.NOT_FOUND);
} catch (TransientPushFailureException e) {
@@ -61,51 +44,4 @@ public class ReceiptController {
}
}
private void sendRelayedReceipt(Account source, String destination, long messageId, String relay)
throws NoSuchUserException, IOException
{
try {
federatedClientManager.getClient(relay)
.sendDeliveryReceipt(source.getNumber(),
source.getAuthenticatedDevice().get().getId(),
destination, messageId);
} catch (NoSuchPeerException e) {
throw new NoSuchUserException(e);
}
}
private void sendDirectReceipt(Account source, String destination, long messageId)
throws NotPushRegisteredException, TransientPushFailureException, NoSuchUserException
{
Account destinationAccount = getDestinationAccount(destination);
Set<Device> destinationDevices = destinationAccount.getDevices();
OutgoingMessageSignal.Builder message =
OutgoingMessageSignal.newBuilder()
.setSource(source.getNumber())
.setSourceDevice((int) source.getAuthenticatedDevice().get().getId())
.setTimestamp(messageId)
.setType(OutgoingMessageSignal.Type.RECEIPT_VALUE);
if (source.getRelay().isPresent()) {
message.setRelay(source.getRelay().get());
}
for (Device destinationDevice : destinationDevices) {
pushSender.sendMessage(destinationAccount, destinationDevice, message.build());
}
}
private Account getDestinationAccount(String destination)
throws NoSuchUserException
{
Optional<Account> account = accountManager.get(destination);
if (!account.isPresent()) {
throw new NoSuchUserException(destination);
}
return account.get();
}
}

View File

@@ -5,6 +5,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.hibernate.validator.constraints.NotEmpty;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
public class ApnMessage {
@JsonProperty
@@ -23,12 +24,17 @@ public class ApnMessage {
@NotEmpty
private String message;
@JsonProperty
@NotNull
private boolean voip;
public ApnMessage() {}
public ApnMessage(String apnId, String number, int deviceId, String message) {
public ApnMessage(String apnId, String number, int deviceId, String message, boolean voip) {
this.apnId = apnId;
this.number = number;
this.deviceId = deviceId;
this.message = message;
this.voip = voip;
}
}

View File

@@ -25,7 +25,14 @@ public class ApnRegistrationId {
@NotEmpty
private String apnRegistrationId;
@JsonProperty
private String voipRegistrationId;
public String getApnRegistrationId() {
return apnRegistrationId;
}
public String getVoipRegistrationId() {
return voipRegistrationId;
}
}

View File

@@ -0,0 +1,70 @@
package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
public class OutgoingMessageEntity {
@JsonIgnore
private long id;
@JsonProperty
private int type;
@JsonProperty
private String relay;
@JsonProperty
private long timestamp;
@JsonProperty
private String source;
@JsonProperty
private int sourceDevice;
@JsonProperty
private byte[] message;
public OutgoingMessageEntity() {}
public OutgoingMessageEntity(long id, int type, String relay, long timestamp,
String source, int sourceDevice, byte[] message)
{
this.id = id;
this.type = type;
this.relay = relay;
this.timestamp = timestamp;
this.source = source;
this.sourceDevice = sourceDevice;
this.message = message;
}
public int getType() {
return type;
}
public String getRelay() {
return relay;
}
public long getTimestamp() {
return timestamp;
}
public String getSource() {
return source;
}
public int getSourceDevice() {
return sourceDevice;
}
public byte[] getMessage() {
return message;
}
public long getId() {
return id;
}
}

View File

@@ -0,0 +1,23 @@
package org.whispersystems.textsecuregcm.entities;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
public class OutgoingMessageEntityList {
@JsonProperty
private List<OutgoingMessageEntity> messages;
public OutgoingMessageEntityList() {}
public OutgoingMessageEntityList(List<OutgoingMessageEntity> messages) {
this.messages = messages;
}
@VisibleForTesting
public List<OutgoingMessageEntity> getMessages() {
return messages;
}
}

View File

@@ -11,6 +11,9 @@ public class UnregisteredEvent {
@NotEmpty
private String registrationId;
@JsonProperty
private String canonicalId;
@JsonProperty
@NotEmpty
private String number;
@@ -26,6 +29,10 @@ public class UnregisteredEvent {
return registrationId;
}
public String getCanonicalId() {
return canonicalId;
}
public String getNumber() {
return number;
}

View File

@@ -40,6 +40,6 @@ public class NonLimitedAccount extends Account {
@Override
public Optional<Device> getAuthenticatedDevice() {
return Optional.of(new Device(deviceId, null, null, null, null, null, false, 0, null, System.currentTimeMillis()));
return Optional.of(new Device(deviceId, null, null, null, null, null, null, false, 0, null, System.currentTimeMillis()));
}
}

View File

@@ -76,7 +76,13 @@ public class FeedbackHandler implements Managed, Runnable {
event.getTimestamp() > device.get().getPushTimestamp())
{
logger.info("GCM Unregister Timestamp matches!");
device.get().setGcmId(null);
if (event.getCanonicalId() != null && !event.getCanonicalId().isEmpty()) {
logger.info("It's a canonical ID update...");
device.get().setGcmId(event.getCanonicalId());
} else {
device.get().setGcmId(null);
}
accountsManager.update(account.get());
}
}

View File

@@ -25,6 +25,7 @@ import org.whispersystems.textsecuregcm.entities.GcmMessage;
import org.whispersystems.textsecuregcm.push.WebsocketSender.DeliveryStatus;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.util.Util;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
@@ -99,8 +100,12 @@ public class PushSender {
DeliveryStatus deliveryStatus = webSocketSender.sendMessage(account, device, outgoingMessage, WebsocketSender.Type.APN);
if (!deliveryStatus.isDelivered() && outgoingMessage.getType() != OutgoingMessageSignal.Type.RECEIPT_VALUE) {
ApnMessage apnMessage = new ApnMessage(device.getApnId(), account.getNumber(), (int)device.getId(),
String.format(APN_PAYLOAD, deliveryStatus.getMessageQueueDepth()));
String apnId = Util.isEmpty(device.getVoipApnId()) ? device.getApnId() : device.getVoipApnId();
boolean isVoip = !Util.isEmpty(device.getVoipApnId());
ApnMessage apnMessage = new ApnMessage(apnId, account.getNumber(), (int)device.getId(),
String.format(APN_PAYLOAD, deliveryStatus.getMessageQueueDepth()),
isVoip);
pushServiceClient.send(apnMessage);
}
}

View File

@@ -0,0 +1,89 @@
package org.whispersystems.textsecuregcm.push;
import com.google.common.base.Optional;
import org.whispersystems.textsecuregcm.controllers.NoSuchUserException;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.federation.FederatedClientManager;
import org.whispersystems.textsecuregcm.federation.NoSuchPeerException;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import java.io.IOException;
import java.util.Set;
public class ReceiptSender {
private final PushSender pushSender;
private final FederatedClientManager federatedClientManager;
private final AccountsManager accountManager;
public ReceiptSender(AccountsManager accountManager,
PushSender pushSender,
FederatedClientManager federatedClientManager)
{
this.federatedClientManager = federatedClientManager;
this.accountManager = accountManager;
this.pushSender = pushSender;
}
public void sendReceipt(Account source, String destination,
long messageId, Optional<String> relay)
throws IOException, NoSuchUserException,
NotPushRegisteredException, TransientPushFailureException
{
if (relay.isPresent() && !relay.get().isEmpty()) {
sendRelayedReceipt(source, destination, messageId, relay.get());
} else {
sendDirectReceipt(source, destination, messageId);
}
}
private void sendRelayedReceipt(Account source, String destination, long messageId, String relay)
throws NoSuchUserException, IOException
{
try {
federatedClientManager.getClient(relay)
.sendDeliveryReceipt(source.getNumber(),
source.getAuthenticatedDevice().get().getId(),
destination, messageId);
} catch (NoSuchPeerException e) {
throw new NoSuchUserException(e);
}
}
private void sendDirectReceipt(Account source, String destination, long messageId)
throws NotPushRegisteredException, TransientPushFailureException, NoSuchUserException
{
Account destinationAccount = getDestinationAccount(destination);
Set<Device> destinationDevices = destinationAccount.getDevices();
MessageProtos.OutgoingMessageSignal.Builder message =
MessageProtos.OutgoingMessageSignal.newBuilder()
.setSource(source.getNumber())
.setSourceDevice((int) source.getAuthenticatedDevice().get().getId())
.setTimestamp(messageId)
.setType(MessageProtos.OutgoingMessageSignal.Type.RECEIPT_VALUE);
if (source.getRelay().isPresent()) {
message.setRelay(source.getRelay().get());
}
for (Device destinationDevice : destinationDevices) {
pushSender.sendMessage(destinationAccount, destinationDevice, message.build());
}
}
private Account getDestinationAccount(String destination)
throws NoSuchUserException
{
Optional<Account> account = accountManager.get(destination);
if (!account.isPresent()) {
throw new NoSuchUserException(destination);
}
return account.get();
}
}

View File

@@ -46,6 +46,9 @@ public class Device {
@JsonProperty
private String apnId;
@JsonProperty
private String voipApnId;
@JsonProperty
private long pushTimestamp;
@@ -65,8 +68,8 @@ public class Device {
public Device(long id, String authToken, String salt,
String signalingKey, String gcmId, String apnId,
boolean fetchesMessages, int registrationId,
SignedPreKey signedPreKey, long lastSeen)
String voipApnId, boolean fetchesMessages,
int registrationId, SignedPreKey signedPreKey, long lastSeen)
{
this.id = id;
this.authToken = authToken;
@@ -74,6 +77,7 @@ public class Device {
this.signalingKey = signalingKey;
this.gcmId = gcmId;
this.apnId = apnId;
this.voipApnId = voipApnId;
this.fetchesMessages = fetchesMessages;
this.registrationId = registrationId;
this.signedPreKey = signedPreKey;
@@ -92,6 +96,14 @@ public class Device {
}
}
public String getVoipApnId() {
return voipApnId;
}
public void setVoipApnId(String voipApnId) {
this.voipApnId = voipApnId;
}
public void setLastSeen(long lastSeen) {
this.lastSeen = lastSeen;
}

View File

@@ -1,6 +1,5 @@
package org.whispersystems.textsecuregcm.storage;
import com.google.protobuf.ByteString;
import org.skife.jdbi.v2.SQLStatement;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.sqlobject.Bind;
@@ -12,7 +11,7 @@ import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.sqlobject.customizers.Mapper;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import java.lang.annotation.Annotation;
import java.lang.annotation.ElementType;
@@ -44,11 +43,16 @@ public abstract class Messages {
@Mapper(MessageMapper.class)
@SqlQuery("SELECT * FROM messages WHERE " + DESTINATION + " = :destination AND " + DESTINATION_DEVICE + " = :destination_device ORDER BY " + TIMESTAMP + " ASC")
abstract List<Pair<Long, OutgoingMessageSignal>> load(@Bind("destination") String destination,
@Bind("destination_device") long destinationDevice);
abstract List<OutgoingMessageEntity> load(@Bind("destination") String destination,
@Bind("destination_device") long destinationDevice);
@SqlUpdate("DELETE FROM messages WHERE " + ID + " = :id")
abstract void remove(@Bind("id") long id);
@Mapper(MessageMapper.class)
@SqlQuery("DELETE FROM messages WHERE " + ID + " IN (SELECT " + ID + " FROM messages WHERE " + DESTINATION + " = :destination AND " + SOURCE + " = :source AND " + TIMESTAMP + " = :timestamp ORDER BY " + ID + " LIMIT 1) RETURNING *")
abstract OutgoingMessageEntity remove(@Bind("destination") String destination, @Bind("source") String source, @Bind("timestamp") long timestamp);
@Mapper(MessageMapper.class)
@SqlUpdate("DELETE FROM messages WHERE " + ID + " = :id AND " + DESTINATION + " = :destination")
abstract void remove(@Bind("destination") String destination, @Bind("id") long id);
@SqlUpdate("DELETE FROM messages WHERE " + DESTINATION + " = :destination")
abstract void clear(@Bind("destination") String destination);
@@ -56,20 +60,18 @@ public abstract class Messages {
@SqlUpdate("VACUUM messages")
public abstract void vacuum();
public static class MessageMapper implements ResultSetMapper<Pair<Long, OutgoingMessageSignal>> {
public static class MessageMapper implements ResultSetMapper<OutgoingMessageEntity> {
@Override
public Pair<Long, OutgoingMessageSignal> map(int i, ResultSet resultSet, StatementContext statementContext)
public OutgoingMessageEntity map(int i, ResultSet resultSet, StatementContext statementContext)
throws SQLException
{
return new Pair<>(resultSet.getLong(ID),
OutgoingMessageSignal.newBuilder()
.setType(resultSet.getInt(TYPE))
.setRelay(resultSet.getString(RELAY))
.setTimestamp(resultSet.getLong(TIMESTAMP))
.setSource(resultSet.getString(SOURCE))
.setSourceDevice(resultSet.getInt(SOURCE_DEVICE))
.setMessage(ByteString.copyFrom(resultSet.getBytes(MESSAGE)))
.build());
return new OutgoingMessageEntity(resultSet.getLong(ID),
resultSet.getInt(TYPE),
resultSet.getString(RELAY),
resultSet.getLong(TIMESTAMP),
resultSet.getString(SOURCE),
resultSet.getInt(SOURCE_DEVICE),
resultSet.getBytes(MESSAGE));
}
}

View File

@@ -1,7 +1,9 @@
package org.whispersystems.textsecuregcm.storage;
import com.google.common.base.Optional;
import org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.util.Pair;
import java.util.List;
@@ -18,7 +20,7 @@ public class MessagesManager {
return this.messages.store(message, destination, destinationDevice) + 1;
}
public List<Pair<Long, OutgoingMessageSignal>> getMessagesForDevice(String destination, long destinationDevice) {
public List<OutgoingMessageEntity> getMessagesForDevice(String destination, long destinationDevice) {
return this.messages.load(destination, destinationDevice);
}
@@ -26,7 +28,11 @@ public class MessagesManager {
this.messages.clear(destination);
}
public void delete(long id) {
this.messages.remove(id);
public Optional<OutgoingMessageEntity> delete(String destination, String source, long timestamp) {
return Optional.fromNullable(this.messages.remove(destination, source, timestamp));
}
public void delete(String destination, long id) {
this.messages.remove(destination, id);
}
}

View File

@@ -59,6 +59,10 @@ public class PubSubManager implements Managed {
dispatchManager.unsubscribe(serializedAddress, dispatchChannel);
}
public boolean hasLocalSubscription(WebsocketAddress address) {
return dispatchManager.hasSubscription(address.serialize());
}
public boolean publish(WebsocketAddress address, PubSubMessage message) {
return publish(address.serialize().getBytes(), message);
}

View File

@@ -6,6 +6,7 @@ import com.codahale.metrics.SharedMetricRegistries;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@@ -27,14 +28,17 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
private final AccountsManager accountsManager;
private final PushSender pushSender;
private final ReceiptSender receiptSender;
private final MessagesManager messagesManager;
private final PubSubManager pubSubManager;
public AuthenticatedConnectListener(AccountsManager accountsManager, PushSender pushSender,
MessagesManager messagesManager, PubSubManager pubSubManager)
ReceiptSender receiptSender, MessagesManager messagesManager,
PubSubManager pubSubManager)
{
this.accountsManager = accountsManager;
this.pushSender = pushSender;
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
this.pubSubManager = pubSubManager;
}
@@ -45,7 +49,7 @@ public class AuthenticatedConnectListener implements WebSocketConnectListener {
final Device device = account.getAuthenticatedDevice().get();
final long connectTime = System.currentTimeMillis();
final WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
final WebSocketConnection connection = new WebSocketConnection(accountsManager, pushSender,
final WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender,
messagesManager, account, device,
context.getClient());

View File

@@ -7,15 +7,16 @@ import java.security.SecureRandom;
public class ProvisioningAddress extends WebsocketAddress {
private final String address;
public ProvisioningAddress(String address, int id) throws InvalidWebsocketAddressException {
super(address, id);
}
public ProvisioningAddress(String address) throws InvalidWebsocketAddressException {
super(address, 0);
this.address = address;
public ProvisioningAddress(String serialized) throws InvalidWebsocketAddressException {
super(serialized);
}
public String getAddress() {
return address;
return getNumber();
}
public static ProvisioningAddress generate() {
@@ -24,7 +25,7 @@ public class ProvisioningAddress extends WebsocketAddress {
SecureRandom.getInstance("SHA1PRNG").nextBytes(random);
return new ProvisioningAddress(Base64.encodeBytesWithoutPadding(random)
.replace('+', '-').replace('/', '_'));
.replace('+', '-').replace('/', '_'), 0);
} catch (NoSuchAlgorithmException | InvalidWebsocketAddressException e) {
throw new AssertionError(e);
}

View File

@@ -52,10 +52,16 @@ public class ProvisioningConnection implements DispatchChannel {
@Override
public void onDispatchSubscribed(String channel) {
this.client.sendRequest("PUT", "/v1/address", Optional.of(ProvisioningUuid.newBuilder()
.setUuid(channel)
.build()
.toByteArray()));
try {
ProvisioningAddress address = new ProvisioningAddress(channel);
this.client.sendRequest("PUT", "/v1/address", Optional.of(ProvisioningUuid.newBuilder()
.setUuid(address.getAddress())
.build()
.toByteArray()));
} catch (InvalidWebsocketAddressException e) {
logger.warn("Badly formatted address", e);
this.client.close(1001, "Server Error");
}
}
@Override

View File

@@ -1,35 +1,34 @@
package org.whispersystems.textsecuregcm.websocket;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.DispatchChannel;
import org.whispersystems.textsecuregcm.controllers.NoSuchUserException;
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.websocket.WebSocketClient;
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.ws.rs.WebApplicationException;
import java.io.IOException;
import java.util.List;
import static com.codahale.metrics.MetricRegistry.name;
import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
@@ -37,7 +36,7 @@ public class WebSocketConnection implements DispatchChannel {
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
private final AccountsManager accountsManager;
private final ReceiptSender receiptSender;
private final PushSender pushSender;
private final MessagesManager messagesManager;
@@ -45,17 +44,15 @@ public class WebSocketConnection implements DispatchChannel {
private final Device device;
private final WebSocketClient client;
private long connectionStartTime;
public WebSocketConnection(AccountsManager accountsManager,
PushSender pushSender,
public WebSocketConnection(PushSender pushSender,
ReceiptSender receiptSender,
MessagesManager messagesManager,
Account account,
Device device,
WebSocketClient client)
{
this.accountsManager = accountsManager;
this.pushSender = pushSender;
this.receiptSender = receiptSender;
this.messagesManager = messagesManager;
this.account = account;
this.device = device;
@@ -105,9 +102,9 @@ public class WebSocketConnection implements DispatchChannel {
boolean isReceipt = message.getType() == OutgoingMessageSignal.Type.RECEIPT_VALUE;
if (isSuccessResponse(response)) {
if (storedMessageId.isPresent()) messagesManager.delete(storedMessageId.get());
if (storedMessageId.isPresent()) messagesManager.delete(account.getNumber(), storedMessageId.get());
if (!isReceipt) sendDeliveryReceiptFor(message);
} else if (!isSuccessResponse(response) & !storedMessageId.isPresent()) {
} else if (!isSuccessResponse(response) && !storedMessageId.isPresent()) {
requeueMessage(message);
}
}
@@ -137,34 +134,32 @@ public class WebSocketConnection implements DispatchChannel {
private void sendDeliveryReceiptFor(OutgoingMessageSignal message) {
try {
Optional<Account> source = accountsManager.get(message.getSource());
if (!source.isPresent()) {
logger.warn(String.format("Source account disappeared? (%s)", message.getSource()));
return;
}
OutgoingMessageSignal.Builder receipt =
OutgoingMessageSignal.newBuilder()
.setSource(account.getNumber())
.setSourceDevice((int) device.getId())
.setTimestamp(message.getTimestamp())
.setType(OutgoingMessageSignal.Type.RECEIPT_VALUE);
for (Device device : source.get().getDevices()) {
pushSender.sendMessage(source.get(), device, receipt.build());
}
} catch (NotPushRegisteredException | TransientPushFailureException e) {
logger.warn("sendDeliveryReceiptFor", "Delivery receipet", e);
receiptSender.sendReceipt(account, message.getSource(), message.getTimestamp(),
message.hasRelay() ? Optional.of(message.getRelay()) :
Optional.<String>absent());
} catch (IOException | NoSuchUserException | TransientPushFailureException | NotPushRegisteredException e) {
logger.warn("sendDeliveryReceiptFor", e);
} catch (WebApplicationException e) {
logger.warn("Bad federated response for receipt: " + e.getResponse().getStatus());
}
}
private void processStoredMessages() {
List<Pair<Long, OutgoingMessageSignal>> messages = messagesManager.getMessagesForDevice(account.getNumber(),
device.getId());
List<OutgoingMessageEntity> messages = messagesManager.getMessagesForDevice(account.getNumber(), device.getId());
for (Pair<Long, OutgoingMessageSignal> message : messages) {
sendMessage(message.second(), Optional.of(message.first()));
for (OutgoingMessageEntity message : messages) {
OutgoingMessageSignal.Builder builder = OutgoingMessageSignal.newBuilder()
.setType(message.getType())
.setMessage(ByteString.copyFrom(message.getMessage()))
.setSourceDevice(message.getSourceDevice())
.setSource(message.getSource())
.setTimestamp(message.getTimestamp());
if (message.getRelay() != null && !message.getRelay().isEmpty()) {
builder.setRelay(message.getRelay());
}
sendMessage(builder.build(), Optional.of(message.getId()));
}
}

View File

@@ -4,7 +4,6 @@ package org.whispersystems.textsecuregcm.tests.controllers;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.sun.jersey.api.client.ClientResponse;
import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -21,15 +20,16 @@ import org.whispersystems.textsecuregcm.federation.FederatedClientManager;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
import javax.ws.rs.core.MediaType;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import io.dropwizard.testing.junit.ResourceTestRule;
@@ -47,8 +47,10 @@ public class FederatedControllerTest {
private static final String MULTI_DEVICE_RECIPIENT = "+14152222222";
private PushSender pushSender = mock(PushSender.class );
private ReceiptSender receiptSender = mock(ReceiptSender.class);
private FederatedClientManager federatedClientManager = mock(FederatedClientManager.class);
private AccountsManager accountsManager = mock(AccountsManager.class );
private MessagesManager messagesManager = mock(MessagesManager.class);
private RateLimiters rateLimiters = mock(RateLimiters.class );
private RateLimiter rateLimiter = mock(RateLimiter.class );
@@ -57,7 +59,7 @@ public class FederatedControllerTest {
private final ObjectMapper mapper = new ObjectMapper();
private final MessageController messageController = new MessageController(rateLimiters, pushSender, accountsManager, federatedClientManager);
private final MessageController messageController = new MessageController(rateLimiters, pushSender, receiptSender, accountsManager, messagesManager, federatedClientManager);
private final KeysControllerV2 keysControllerV2 = mock(KeysControllerV2.class);
@Rule
@@ -72,12 +74,12 @@ public class FederatedControllerTest {
@Before
public void setup() throws Exception {
Set<Device> singleDeviceList = new HashSet<Device>() {{
add(new Device(1, "foo", "bar", "baz", "isgcm", null, false, 111, null, System.currentTimeMillis()));
add(new Device(1, "foo", "bar", "baz", "isgcm", null, null, false, 111, null, System.currentTimeMillis()));
}};
Set<Device> multiDeviceList = new HashSet<Device>() {{
add(new Device(1, "foo", "bar", "baz", "isgcm", null, false, 222, null, System.currentTimeMillis()));
add(new Device(2, "foo", "bar", "baz", "isgcm", null, false, 333, null, System.currentTimeMillis()));
add(new Device(1, "foo", "bar", "baz", "isgcm", null, null, false, 222, null, System.currentTimeMillis()));
add(new Device(2, "foo", "bar", "baz", "isgcm", null, null, false, 333, null, System.currentTimeMillis()));
}};
Account singleDeviceAccount = new Account(SINGLE_DEVICE_RECIPIENT, false, singleDeviceList);

View File

@@ -3,28 +3,34 @@ package org.whispersystems.textsecuregcm.tests.controllers;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.sun.jersey.api.client.ClientResponse;
import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.controllers.MessageController;
import org.whispersystems.textsecuregcm.entities.IncomingMessageList;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.entities.MismatchedDevices;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntityList;
import org.whispersystems.textsecuregcm.entities.SignedPreKey;
import org.whispersystems.textsecuregcm.entities.StaleDevices;
import org.whispersystems.textsecuregcm.federation.FederatedClientManager;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.storage.MessagesManager;
import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import javax.ws.rs.core.MediaType;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -32,6 +38,7 @@ import io.dropwizard.testing.junit.ResourceTestRule;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
@@ -44,8 +51,10 @@ public class MessageControllerTest {
private static final String MULTI_DEVICE_RECIPIENT = "+14152222222";
private final PushSender pushSender = mock(PushSender.class );
private final ReceiptSender receiptSender = mock(ReceiptSender.class);
private final FederatedClientManager federatedClientManager = mock(FederatedClientManager.class);
private final AccountsManager accountsManager = mock(AccountsManager.class );
private final MessagesManager messagesManager = mock(MessagesManager.class);
private final RateLimiters rateLimiters = mock(RateLimiters.class );
private final RateLimiter rateLimiter = mock(RateLimiter.class );
@@ -54,21 +63,21 @@ public class MessageControllerTest {
@Rule
public final ResourceTestRule resources = ResourceTestRule.builder()
.addProvider(AuthHelper.getAuthenticator())
.addResource(new MessageController(rateLimiters, pushSender, accountsManager,
federatedClientManager))
.addResource(new MessageController(rateLimiters, pushSender, receiptSender, accountsManager,
messagesManager, federatedClientManager))
.build();
@Before
public void setup() throws Exception {
Set<Device> singleDeviceList = new HashSet<Device>() {{
add(new Device(1, "foo", "bar", "baz", "isgcm", null, false, 111, null, System.currentTimeMillis()));
add(new Device(1, "foo", "bar", "baz", "isgcm", null, null, false, 111, null, System.currentTimeMillis()));
}};
Set<Device> multiDeviceList = new HashSet<Device>() {{
add(new Device(1, "foo", "bar", "baz", "isgcm", null, false, 222, new SignedPreKey(111, "foo", "bar"), System.currentTimeMillis()));
add(new Device(2, "foo", "bar", "baz", "isgcm", null, false, 333, new SignedPreKey(222, "oof", "rab"), System.currentTimeMillis()));
add(new Device(3, "foo", "bar", "baz", "isgcm", null, false, 444, null, System.currentTimeMillis() - TimeUnit.DAYS.toMillis(31)));
add(new Device(1, "foo", "bar", "baz", "isgcm", null, null, false, 222, new SignedPreKey(111, "foo", "bar"), System.currentTimeMillis()));
add(new Device(2, "foo", "bar", "baz", "isgcm", null, null, false, 333, new SignedPreKey(222, "oof", "rab"), System.currentTimeMillis()));
add(new Device(3, "foo", "bar", "baz", "isgcm", null, null, false, 444, null, System.currentTimeMillis() - TimeUnit.DAYS.toMillis(31)));
}};
Account singleDeviceAccount = new Account(SINGLE_DEVICE_RECIPIENT, false, singleDeviceList);
@@ -177,4 +186,75 @@ public class MessageControllerTest {
}
@Test
public synchronized void testGetMessages() throws Exception {
final long timestampOne = 313377;
final long timestampTwo = 313388;
List<OutgoingMessageEntity> messages = new LinkedList<OutgoingMessageEntity>() {{
add(new OutgoingMessageEntity(1L, MessageProtos.OutgoingMessageSignal.Type.CIPHERTEXT_VALUE, null, timestampOne, "+14152222222", 2, "hi there".getBytes()));
add(new OutgoingMessageEntity(2L, MessageProtos.OutgoingMessageSignal.Type.RECEIPT_VALUE, null, timestampTwo, "+14152222222", 2, null));
}};
when(messagesManager.getMessagesForDevice(eq(AuthHelper.VALID_NUMBER), eq(1L))).thenReturn(messages);
OutgoingMessageEntityList response =
resources.client().resource("/v1/messages/")
.header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD))
.accept(MediaType.APPLICATION_JSON_TYPE)
.get(OutgoingMessageEntityList.class);
assertEquals(response.getMessages().size(), 2);
assertEquals(response.getMessages().get(0).getId(), 0);
assertEquals(response.getMessages().get(1).getId(), 0);
assertEquals(response.getMessages().get(0).getTimestamp(), timestampOne);
assertEquals(response.getMessages().get(1).getTimestamp(), timestampTwo);
}
@Test
public synchronized void testDeleteMessages() throws Exception {
long timestamp = System.currentTimeMillis();
when(messagesManager.delete(AuthHelper.VALID_NUMBER, "+14152222222", 31337))
.thenReturn(Optional.of(new OutgoingMessageEntity(31337L,
MessageProtos.OutgoingMessageSignal.Type.CIPHERTEXT_VALUE,
null, timestamp,
"+14152222222", 1, "hi".getBytes())));
when(messagesManager.delete(AuthHelper.VALID_NUMBER, "+14152222222", 31338))
.thenReturn(Optional.of(new OutgoingMessageEntity(31337L,
MessageProtos.OutgoingMessageSignal.Type.RECEIPT_VALUE,
null, System.currentTimeMillis(),
"+14152222222", 1, null)));
when(messagesManager.delete(AuthHelper.VALID_NUMBER, "+14152222222", 31339))
.thenReturn(Optional.<OutgoingMessageEntity>absent());
ClientResponse response = resources.client().resource(String.format("/v1/messages/%s/%d", "+14152222222", 31337))
.header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD))
.delete(ClientResponse.class);
assertThat("Good Response Code", response.getStatus(), is(equalTo(204)));
verify(receiptSender).sendReceipt(any(Account.class), eq("+14152222222"), eq(timestamp), eq(Optional.<String>absent()));
response = resources.client().resource(String.format("/v1/messages/%s/%d", "+14152222222", 31338))
.header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD))
.delete(ClientResponse.class);
assertThat("Good Response Code", response.getStatus(), is(equalTo(204)));
verifyNoMoreInteractions(receiptSender);
response = resources.client().resource(String.format("/v1/messages/%s/%d", "+14152222222", 31339))
.header("Authorization", AuthHelper.getAuthHeader(AuthHelper.VALID_NUMBER, AuthHelper.VALID_PASSWORD))
.delete(ClientResponse.class);
assertThat("Good Response Code", response.getStatus(), is(equalTo(204)));
verifyNoMoreInteractions(receiptSender);
}
}

View File

@@ -6,28 +6,21 @@ import com.sun.jersey.api.client.ClientResponse;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.whispersystems.textsecuregcm.controllers.MessageController;
import org.whispersystems.textsecuregcm.controllers.ReceiptController;
import org.whispersystems.textsecuregcm.entities.MessageProtos;
import org.whispersystems.textsecuregcm.federation.FederatedClientManager;
import org.whispersystems.textsecuregcm.limits.RateLimiter;
import org.whispersystems.textsecuregcm.limits.RateLimiters;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
import org.whispersystems.textsecuregcm.tests.util.AuthHelper;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import io.dropwizard.testing.junit.ResourceTestRule;
import static org.fest.assertions.api.Assertions.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;
@@ -40,23 +33,25 @@ public class ReceiptControllerTest {
private final FederatedClientManager federatedClientManager = mock(FederatedClientManager.class);
private final AccountsManager accountsManager = mock(AccountsManager.class );
private final ReceiptSender receiptSender = new ReceiptSender(accountsManager, pushSender, federatedClientManager);
private final ObjectMapper mapper = new ObjectMapper();
@Rule
public final ResourceTestRule resources = ResourceTestRule.builder()
.addProvider(AuthHelper.getAuthenticator())
.addResource(new ReceiptController(accountsManager, federatedClientManager, pushSender))
.addResource(new ReceiptController(receiptSender))
.build();
@Before
public void setup() throws Exception {
Set<Device> singleDeviceList = new HashSet<Device>() {{
add(new Device(1, "foo", "bar", "baz", "isgcm", null, false, 111, null, System.currentTimeMillis()));
add(new Device(1, "foo", "bar", "baz", "isgcm", null, null, false, 111, null, System.currentTimeMillis()));
}};
Set<Device> multiDeviceList = new HashSet<Device>() {{
add(new Device(1, "foo", "bar", "baz", "isgcm", null, false, 222, null, System.currentTimeMillis()));
add(new Device(2, "foo", "bar", "baz", "isgcm", null, false, 333, null, System.currentTimeMillis()));
add(new Device(1, "foo", "bar", "baz", "isgcm", null, null, false, 222, null, System.currentTimeMillis()));
add(new Device(2, "foo", "bar", "baz", "isgcm", null, null, false, 333, null, System.currentTimeMillis()));
}};
Account singleDeviceAccount = new Account(SINGLE_DEVICE_RECIPIENT, false, singleDeviceList);

View File

@@ -9,7 +9,9 @@ import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.Device;
@@ -57,12 +59,13 @@ public class WebSocketConnectionTest {
private static final Device device = mock(Device.class );
private static final UpgradeRequest upgradeRequest = mock(UpgradeRequest.class );
private static final PushSender pushSender = mock(PushSender.class);
private static final ReceiptSender receiptSender = mock(ReceiptSender.class);
@Test
public void testCredentials() throws Exception {
MessagesManager storedMessages = mock(MessagesManager.class);
WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator);
AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(accountsManager, pushSender, storedMessages, pubSubManager);
AuthenticatedConnectListener connectListener = new AuthenticatedConnectListener(accountsManager, pushSender, receiptSender, storedMessages, pubSubManager);
WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class);
when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD))))
@@ -98,10 +101,10 @@ public class WebSocketConnectionTest {
public void testOpen() throws Exception {
MessagesManager storedMessages = mock(MessagesManager.class);
List<Pair<Long, OutgoingMessageSignal>> outgoingMessages = new LinkedList<Pair<Long, OutgoingMessageSignal>> () {{
add(new Pair<>(1L, createMessage("sender1", 1111, false, "first")));
add(new Pair<>(2L, createMessage("sender1", 2222, false, "second")));
add(new Pair<>(3L, createMessage("sender2", 3333, false, "third")));
List<OutgoingMessageEntity> outgoingMessages = new LinkedList<OutgoingMessageEntity> () {{
add(createMessage(1L, "sender1", 1111, false, "first"));
add(createMessage(2L, "sender1", 2222, false, "second"));
add(createMessage(3L, "sender2", 3333, false, "third"));
}};
when(device.getId()).thenReturn(2L);
@@ -139,7 +142,7 @@ public class WebSocketConnectionTest {
});
WebsocketAddress websocketAddress = new WebsocketAddress(account.getNumber(), device.getId());
WebSocketConnection connection = new WebSocketConnection(accountsManager, pushSender, storedMessages,
WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, storedMessages,
account, device, client);
connection.onDispatchSubscribed(websocketAddress.serialize());
@@ -154,25 +157,102 @@ public class WebSocketConnectionTest {
futures.get(0).setException(new IOException());
futures.get(2).setException(new IOException());
List<OutgoingMessageSignal> pending = new LinkedList<OutgoingMessageSignal>() {{
add(createMessage("sender1", 1111, false, "first"));
add(createMessage("sender2", 3333, false, "third"));
}};
verify(pushSender, times(1)).sendMessage(eq(sender1), eq(sender1device), any(OutgoingMessageSignal.class));
verify(storedMessages, times(1)).delete(eq(account.getNumber()), eq(2L));
verify(receiptSender, times(1)).sendReceipt(eq(account), eq("sender1"), eq(2222L), eq(Optional.<String>absent()));
connection.onDispatchUnsubscribed(websocketAddress.serialize());
verify(client).close(anyInt(), anyString());
}
private OutgoingMessageSignal createMessage(String sender, long timestamp, boolean receipt, String content) {
return OutgoingMessageSignal.newBuilder()
.setSource(sender)
.setSourceDevice(1)
.setType(receipt ? OutgoingMessageSignal.Type.RECEIPT_VALUE : OutgoingMessageSignal.Type.CIPHERTEXT_VALUE)
.setTimestamp(timestamp)
.setMessage(ByteString.copyFrom(content.getBytes()))
.build();
@Test
public void testOnlineSend() throws Exception {
MessagesManager storedMessages = mock(MessagesManager.class);
OutgoingMessageSignal firstMessage = OutgoingMessageSignal.newBuilder()
.setMessage(ByteString.copyFrom("first".getBytes()))
.setSource("sender1")
.setTimestamp(System.currentTimeMillis())
.setSourceDevice(1)
.setType(OutgoingMessageSignal.Type.CIPHERTEXT_VALUE)
.build();
OutgoingMessageSignal secondMessage = OutgoingMessageSignal.newBuilder()
.setMessage(ByteString.copyFrom("second".getBytes()))
.setSource("sender2")
.setTimestamp(System.currentTimeMillis())
.setSourceDevice(2)
.setType(OutgoingMessageSignal.Type.CIPHERTEXT_VALUE)
.build();
List<OutgoingMessageEntity> pendingMessages = new LinkedList<>();
when(device.getId()).thenReturn(2L);
when(device.getSignalingKey()).thenReturn(Base64.encodeBytes(new byte[52]));
when(account.getAuthenticatedDevice()).thenReturn(Optional.of(device));
when(account.getNumber()).thenReturn("+14152222222");
final Device sender1device = mock(Device.class);
Set<Device> sender1devices = new HashSet<Device>() {{
add(sender1device);
}};
Account sender1 = mock(Account.class);
when(sender1.getDevices()).thenReturn(sender1devices);
when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1));
when(accountsManager.get("sender2")).thenReturn(Optional.<Account>absent());
when(storedMessages.getMessagesForDevice(account.getNumber(), device.getId()))
.thenReturn(pendingMessages);
final List<SettableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
final WebSocketClient client = mock(WebSocketClient.class);
when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), any(Optional.class)))
.thenAnswer(new Answer<SettableFuture<WebSocketResponseMessage>>() {
@Override
public SettableFuture<WebSocketResponseMessage> answer(InvocationOnMock invocationOnMock) throws Throwable {
SettableFuture<WebSocketResponseMessage> future = SettableFuture.create();
futures.add(future);
return future;
}
});
WebsocketAddress websocketAddress = new WebsocketAddress(account.getNumber(), device.getId());
WebSocketConnection connection = new WebSocketConnection(pushSender, receiptSender, storedMessages,
account, device, client);
connection.onDispatchSubscribed(websocketAddress.serialize());
connection.onDispatchMessage(websocketAddress.serialize(), PubSubProtos.PubSubMessage.newBuilder()
.setType(PubSubProtos.PubSubMessage.Type.DELIVER)
.setContent(ByteString.copyFrom(firstMessage.toByteArray()))
.build().toByteArray());
connection.onDispatchMessage(websocketAddress.serialize(), PubSubProtos.PubSubMessage.newBuilder()
.setType(PubSubProtos.PubSubMessage.Type.DELIVER)
.setContent(ByteString.copyFrom(secondMessage.toByteArray()))
.build().toByteArray());
verify(client, times(2)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(Optional.class));
assertEquals(futures.size(), 2);
WebSocketResponseMessage response = mock(WebSocketResponseMessage.class);
when(response.getStatus()).thenReturn(200);
futures.get(1).set(response);
futures.get(0).setException(new IOException());
verify(receiptSender, times(1)).sendReceipt(eq(account), eq("sender2"), eq(secondMessage.getTimestamp()), eq(Optional.<String>absent()));
verify(pushSender, times(1)).sendMessage(eq(account), eq(device), any(OutgoingMessageSignal.class));
connection.onDispatchUnsubscribed(websocketAddress.serialize());
verify(client).close(anyInt(), anyString());
}
private OutgoingMessageEntity createMessage(long id, String sender, long timestamp, boolean receipt, String content) {
return new OutgoingMessageEntity(id, receipt ? OutgoingMessageSignal.Type.RECEIPT_VALUE : OutgoingMessageSignal.Type.CIPHERTEXT_VALUE,
null, timestamp, sender, 1, content.getBytes());
}
}