mirror of
https://github.com/signalapp/Signal-Server.git
synced 2025-12-17 02:10:36 +00:00
Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aa2a5ff929 | ||
|
|
56d3c1e73f | ||
|
|
f401f9a674 | ||
|
|
30933d792b | ||
|
|
905717977e | ||
|
|
b802994809 | ||
|
|
ac96f906b3 | ||
|
|
cc395e914f | ||
|
|
f8063f8faf | ||
|
|
958ada9110 | ||
|
|
3452ea29b8 | ||
|
|
675b6f4b5e | ||
|
|
4fab67b0f5 | ||
|
|
8a2131416d | ||
|
|
2525304215 | ||
|
|
fdb35d4f77 | ||
|
|
222c7ea641 |
15
pom.xml
15
pom.xml
@@ -9,10 +9,10 @@
|
||||
|
||||
<groupId>org.whispersystems.textsecure</groupId>
|
||||
<artifactId>TextSecureServer</artifactId>
|
||||
<version>0.23</version>
|
||||
<version>0.26</version>
|
||||
|
||||
<properties>
|
||||
<dropwizard.version>0.7.0</dropwizard.version>
|
||||
<dropwizard.version>0.7.1</dropwizard.version>
|
||||
<jackson.api.version>2.3.3</jackson.api.version>
|
||||
<commons-codec.version>1.6</commons-codec.version>
|
||||
</properties>
|
||||
@@ -53,6 +53,12 @@
|
||||
<artifactId>dropwizard-metrics-graphite</artifactId>
|
||||
<version>${dropwizard.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.dropwizard</groupId>
|
||||
<artifactId>dropwizard-client</artifactId>
|
||||
<version>${dropwizard.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
@@ -130,6 +136,11 @@
|
||||
<artifactId>smack-tcp</artifactId>
|
||||
<version>4.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.whispersystems.websocket</groupId>
|
||||
<artifactId>websocket-resources</artifactId>
|
||||
<version>0.1-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
@@ -24,7 +24,9 @@ import org.whispersystems.textsecuregcm.configuration.GraphiteConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.MemcacheConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.MetricsConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.NexmoConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.PushConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RedPhoneConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.RedisConfiguration;
|
||||
import org.whispersystems.textsecuregcm.configuration.S3Configuration;
|
||||
import org.whispersystems.textsecuregcm.configuration.TwilioConfiguration;
|
||||
@@ -34,6 +36,7 @@ import javax.validation.Valid;
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
import io.dropwizard.Configuration;
|
||||
import io.dropwizard.client.JerseyClientConfiguration;
|
||||
import io.dropwizard.db.DataSourceFactory;
|
||||
|
||||
public class WhisperServerConfiguration extends Configuration {
|
||||
@@ -49,7 +52,7 @@ public class WhisperServerConfiguration extends Configuration {
|
||||
@NotNull
|
||||
@Valid
|
||||
@JsonProperty
|
||||
private GcmConfiguration gcm;
|
||||
private PushConfiguration push;
|
||||
|
||||
@NotNull
|
||||
@Valid
|
||||
@@ -66,9 +69,6 @@ public class WhisperServerConfiguration extends Configuration {
|
||||
@JsonProperty
|
||||
private RedisConfiguration redis;
|
||||
|
||||
@JsonProperty
|
||||
private ApnConfiguration apn = new ApnConfiguration();
|
||||
|
||||
@Valid
|
||||
@JsonProperty
|
||||
private FederationConfiguration federation = new FederationConfiguration();
|
||||
@@ -95,6 +95,15 @@ public class WhisperServerConfiguration extends Configuration {
|
||||
@JsonProperty
|
||||
private WebsocketConfiguration websocket = new WebsocketConfiguration();
|
||||
|
||||
@JsonProperty
|
||||
private RedPhoneConfiguration redphone = new RedPhoneConfiguration();
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private JerseyClientConfiguration httpClient = new JerseyClientConfiguration();
|
||||
|
||||
|
||||
public WebsocketConfiguration getWebsocketConfiguration() {
|
||||
return websocket;
|
||||
}
|
||||
@@ -107,12 +116,12 @@ public class WhisperServerConfiguration extends Configuration {
|
||||
return nexmo;
|
||||
}
|
||||
|
||||
public GcmConfiguration getGcmConfiguration() {
|
||||
return gcm;
|
||||
public PushConfiguration getPushConfiguration() {
|
||||
return push;
|
||||
}
|
||||
|
||||
public ApnConfiguration getApnConfiguration() {
|
||||
return apn;
|
||||
public JerseyClientConfiguration getJerseyClientConfiguration() {
|
||||
return httpClient;
|
||||
}
|
||||
|
||||
public S3Configuration getS3Configuration() {
|
||||
@@ -146,4 +155,8 @@ public class WhisperServerConfiguration extends Configuration {
|
||||
public MetricsConfiguration getMetricsConfiguration() {
|
||||
return viz;
|
||||
}
|
||||
|
||||
public RedPhoneConfiguration getRedphoneConfiguration() {
|
||||
return redphone;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.graphite.GraphiteReporter;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.google.common.base.Optional;
|
||||
import com.sun.jersey.api.client.Client;
|
||||
import net.spy.memcached.MemcachedClient;
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
import org.eclipse.jetty.servlets.CrossOriginFilter;
|
||||
@@ -52,9 +53,10 @@ import org.whispersystems.textsecuregcm.providers.MemcacheHealthCheck;
|
||||
import org.whispersystems.textsecuregcm.providers.MemcachedClientFactory;
|
||||
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
|
||||
import org.whispersystems.textsecuregcm.providers.RedisHealthCheck;
|
||||
import org.whispersystems.textsecuregcm.push.APNSender;
|
||||
import org.whispersystems.textsecuregcm.push.GCMSender;
|
||||
import org.whispersystems.textsecuregcm.providers.TimeProvider;
|
||||
import org.whispersystems.textsecuregcm.push.FeedbackHandler;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.push.PushServiceClient;
|
||||
import org.whispersystems.textsecuregcm.push.WebsocketSender;
|
||||
import org.whispersystems.textsecuregcm.sms.NexmoSmsSender;
|
||||
import org.whispersystems.textsecuregcm.sms.SmsSender;
|
||||
@@ -72,9 +74,12 @@ import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||
import org.whispersystems.textsecuregcm.storage.StoredMessages;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
import org.whispersystems.textsecuregcm.util.UrlSigner;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebsocketControllerFactory;
|
||||
import org.whispersystems.textsecuregcm.websocket.ConnectListener;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
|
||||
import org.whispersystems.textsecuregcm.workers.DirectoryCommand;
|
||||
import org.whispersystems.textsecuregcm.workers.VacuumCommand;
|
||||
import org.whispersystems.websocket.WebSocketResourceProviderFactory;
|
||||
import org.whispersystems.websocket.setup.WebSocketEnvironment;
|
||||
|
||||
import javax.servlet.DispatcherType;
|
||||
import javax.servlet.FilterRegistration;
|
||||
@@ -85,6 +90,7 @@ import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
import io.dropwizard.Application;
|
||||
import io.dropwizard.client.JerseyClientBuilder;
|
||||
import io.dropwizard.db.DataSourceFactory;
|
||||
import io.dropwizard.jdbi.DBIFactory;
|
||||
import io.dropwizard.metrics.graphite.GraphiteReporterFactory;
|
||||
@@ -133,6 +139,8 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
|
||||
MemcachedClient memcachedClient = new MemcachedClientFactory(config.getMemcacheConfiguration()).getClient();
|
||||
JedisPool redisClient = new RedisClientFactory(config.getRedisConfiguration()).getRedisClientPool();
|
||||
Client httpClient = new JerseyClientBuilder(environment).using(config.getJerseyClientConfiguration())
|
||||
.build(getName());
|
||||
|
||||
DirectoryManager directory = new DirectoryManager(redisClient);
|
||||
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, memcachedClient);
|
||||
@@ -141,28 +149,20 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
FederatedClientManager federatedClientManager = new FederatedClientManager(config.getFederationConfiguration());
|
||||
StoredMessages storedMessages = new StoredMessages(redisClient);
|
||||
PubSubManager pubSubManager = new PubSubManager(redisClient);
|
||||
PushServiceClient pushServiceClient = new PushServiceClient(httpClient, config.getPushConfiguration());
|
||||
WebsocketSender websocketSender = new WebsocketSender(storedMessages, pubSubManager);
|
||||
AccountAuthenticator deviceAuthenticator = new AccountAuthenticator(accountsManager);
|
||||
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), memcachedClient);
|
||||
|
||||
APNSender apnSender = new APNSender(accountsManager, pubSubManager, storedMessages, memcachedClient,
|
||||
config.getApnConfiguration().getCertificate(),
|
||||
config.getApnConfiguration().getKey());
|
||||
TwilioSmsSender twilioSmsSender = new TwilioSmsSender(config.getTwilioConfiguration());
|
||||
Optional<NexmoSmsSender> nexmoSmsSender = initializeNexmoSmsSender(config.getNexmoConfiguration());
|
||||
SmsSender smsSender = new SmsSender(twilioSmsSender, nexmoSmsSender, config.getTwilioConfiguration().isInternational());
|
||||
UrlSigner urlSigner = new UrlSigner(config.getS3Configuration());
|
||||
PushSender pushSender = new PushSender(pushServiceClient, websocketSender);
|
||||
FeedbackHandler feedbackHandler = new FeedbackHandler(pushServiceClient, accountsManager);
|
||||
Optional<byte[]> authorizationKey = config.getRedphoneConfiguration().getAuthorizationKey();
|
||||
|
||||
GCMSender gcmSender = new GCMSender(accountsManager,
|
||||
config.getGcmConfiguration().getSenderId(),
|
||||
config.getGcmConfiguration().getApiKey());
|
||||
|
||||
WebsocketSender websocketSender = new WebsocketSender(storedMessages, pubSubManager);
|
||||
|
||||
environment.lifecycle().manage(apnSender);
|
||||
environment.lifecycle().manage(gcmSender);
|
||||
|
||||
AccountAuthenticator deviceAuthenticator = new AccountAuthenticator(accountsManager);
|
||||
RateLimiters rateLimiters = new RateLimiters(config.getLimitsConfiguration(), memcachedClient);
|
||||
|
||||
TwilioSmsSender twilioSmsSender = new TwilioSmsSender(config.getTwilioConfiguration());
|
||||
Optional<NexmoSmsSender> nexmoSmsSender = initializeNexmoSmsSender(config.getNexmoConfiguration());
|
||||
SmsSender smsSender = new SmsSender(twilioSmsSender, nexmoSmsSender, config.getTwilioConfiguration().isInternational());
|
||||
UrlSigner urlSigner = new UrlSigner(config.getS3Configuration());
|
||||
PushSender pushSender = new PushSender(gcmSender, apnSender, websocketSender);
|
||||
environment.lifecycle().manage(feedbackHandler);
|
||||
|
||||
AttachmentController attachmentController = new AttachmentController(rateLimiters, federatedClientManager, urlSigner);
|
||||
KeysControllerV1 keysControllerV1 = new KeysControllerV1(rateLimiters, keys, accountsManager, federatedClientManager);
|
||||
@@ -174,7 +174,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
deviceAuthenticator,
|
||||
Device.class, "WhisperServer"));
|
||||
|
||||
environment.jersey().register(new AccountController(pendingAccountsManager, accountsManager, rateLimiters, smsSender, storedMessages));
|
||||
environment.jersey().register(new AccountController(pendingAccountsManager, accountsManager, rateLimiters, smsSender, storedMessages, new TimeProvider(), authorizationKey));
|
||||
environment.jersey().register(new DeviceController(pendingDevicesManager, accountsManager, rateLimiters));
|
||||
environment.jersey().register(new DirectoryController(rateLimiters, directory));
|
||||
environment.jersey().register(new FederationControllerV1(accountsManager, attachmentController, messageController, keysControllerV1));
|
||||
@@ -186,11 +186,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
||||
environment.jersey().register(messageController);
|
||||
|
||||
if (config.getWebsocketConfiguration().isEnabled()) {
|
||||
WebsocketControllerFactory servlet = new WebsocketControllerFactory(deviceAuthenticator,
|
||||
accountsManager,
|
||||
pushSender,
|
||||
storedMessages,
|
||||
pubSubManager);
|
||||
WebSocketEnvironment webSocketEnvironment = new WebSocketEnvironment(environment);
|
||||
webSocketEnvironment.setAuthenticator(new WebSocketAccountAuthenticator(deviceAuthenticator));
|
||||
webSocketEnvironment.setConnectListener(new ConnectListener(accountsManager, pushSender, storedMessages, pubSubManager));
|
||||
|
||||
WebSocketResourceProviderFactory servlet = new WebSocketResourceProviderFactory(webSocketEnvironment);
|
||||
|
||||
ServletRegistration.Dynamic websocket = environment.servlets().addServlet("WebSocket", servlet);
|
||||
websocket.addMapping("/v1/websocket/*");
|
||||
|
||||
@@ -0,0 +1,76 @@
|
||||
package org.whispersystems.textsecuregcm.auth;
|
||||
|
||||
|
||||
import org.apache.commons.codec.DecoderException;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
||||
import javax.crypto.Mac;
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class AuthorizationToken {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(AuthorizationToken.class);
|
||||
|
||||
private final String token;
|
||||
private final byte[] key;
|
||||
|
||||
public AuthorizationToken(String token, byte[] key) {
|
||||
this.token = token;
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
public boolean isValid(String number, long currentTimeMillis) {
|
||||
String[] parts = token.split(":");
|
||||
|
||||
if (parts.length != 3) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!number.equals(parts[0])) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!isValidTime(parts[1], currentTimeMillis)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return isValidSignature(parts[0] + ":" + parts[1], parts[2]);
|
||||
}
|
||||
|
||||
private boolean isValidTime(String timeString, long currentTimeMillis) {
|
||||
try {
|
||||
long tokenTime = Long.parseLong(timeString);
|
||||
long ourTime = TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis);
|
||||
|
||||
return TimeUnit.SECONDS.toHours(Math.abs(ourTime - tokenTime)) < 24;
|
||||
} catch (NumberFormatException e) {
|
||||
logger.warn("Number Format", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isValidSignature(String prefix, String suffix) {
|
||||
try {
|
||||
Mac hmac = Mac.getInstance("HmacSHA256");
|
||||
hmac.init(new SecretKeySpec(key, "HmacSHA256"));
|
||||
|
||||
byte[] ourSuffix = Util.truncate(hmac.doFinal(prefix.getBytes()), 10);
|
||||
byte[] theirSuffix = Hex.decodeHex(suffix.toCharArray());
|
||||
|
||||
return MessageDigest.isEqual(ourSuffix, theirSuffix);
|
||||
} catch (NoSuchAlgorithmException | InvalidKeyException e) {
|
||||
throw new AssertionError(e);
|
||||
} catch (DecoderException e) {
|
||||
logger.warn("Authorizationtoken", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package org.whispersystems.textsecuregcm.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.hibernate.validator.constraints.NotEmpty;
|
||||
|
||||
import javax.validation.constraints.Min;
|
||||
|
||||
public class PushConfiguration {
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String host;
|
||||
|
||||
@JsonProperty
|
||||
@Min(1)
|
||||
private int port;
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String username;
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String password;
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
public String getUsername() {
|
||||
return username;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package org.whispersystems.textsecuregcm.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Optional;
|
||||
import org.apache.commons.codec.DecoderException;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
|
||||
public class RedPhoneConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
private String authKey;
|
||||
|
||||
public Optional<byte[]> getAuthorizationKey() throws DecoderException {
|
||||
if (authKey == null || authKey.trim().length() == 0) {
|
||||
return Optional.absent();
|
||||
}
|
||||
|
||||
return Optional.of(Hex.decodeHex(authKey.toCharArray()));
|
||||
}
|
||||
}
|
||||
@@ -27,7 +27,9 @@ import org.whispersystems.textsecuregcm.auth.InvalidAuthorizationHeaderException
|
||||
import org.whispersystems.textsecuregcm.entities.AccountAttributes;
|
||||
import org.whispersystems.textsecuregcm.entities.ApnRegistrationId;
|
||||
import org.whispersystems.textsecuregcm.entities.GcmRegistrationId;
|
||||
import org.whispersystems.textsecuregcm.auth.AuthorizationToken;
|
||||
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
||||
import org.whispersystems.textsecuregcm.providers.TimeProvider;
|
||||
import org.whispersystems.textsecuregcm.sms.SmsSender;
|
||||
import org.whispersystems.textsecuregcm.sms.TwilioSmsSender;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
@@ -68,18 +70,24 @@ public class AccountController {
|
||||
private final RateLimiters rateLimiters;
|
||||
private final SmsSender smsSender;
|
||||
private final StoredMessages storedMessages;
|
||||
private final TimeProvider timeProvider;
|
||||
private final Optional<byte[]> authorizationKey;
|
||||
|
||||
public AccountController(PendingAccountsManager pendingAccounts,
|
||||
AccountsManager accounts,
|
||||
RateLimiters rateLimiters,
|
||||
SmsSender smsSenderFactory,
|
||||
StoredMessages storedMessages)
|
||||
StoredMessages storedMessages,
|
||||
TimeProvider timeProvider,
|
||||
Optional<byte[]> authorizationKey)
|
||||
{
|
||||
this.pendingAccounts = pendingAccounts;
|
||||
this.accounts = accounts;
|
||||
this.rateLimiters = rateLimiters;
|
||||
this.smsSender = smsSenderFactory;
|
||||
this.storedMessages = storedMessages;
|
||||
this.pendingAccounts = pendingAccounts;
|
||||
this.accounts = accounts;
|
||||
this.rateLimiters = rateLimiters;
|
||||
this.smsSender = smsSenderFactory;
|
||||
this.storedMessages = storedMessages;
|
||||
this.timeProvider = timeProvider;
|
||||
this.authorizationKey = authorizationKey;
|
||||
}
|
||||
|
||||
@Timed
|
||||
@@ -145,30 +153,46 @@ public class AccountController {
|
||||
throw new WebApplicationException(Response.status(417).build());
|
||||
}
|
||||
|
||||
Device device = new Device();
|
||||
device.setId(Device.MASTER_ID);
|
||||
device.setAuthenticationCredentials(new AuthenticationCredentials(password));
|
||||
device.setSignalingKey(accountAttributes.getSignalingKey());
|
||||
device.setFetchesMessages(accountAttributes.getFetchesMessages());
|
||||
device.setRegistrationId(accountAttributes.getRegistrationId());
|
||||
|
||||
Account account = new Account();
|
||||
account.setNumber(number);
|
||||
account.setSupportsSms(accountAttributes.getSupportsSms());
|
||||
account.addDevice(device);
|
||||
|
||||
accounts.create(account);
|
||||
storedMessages.clear(new WebsocketAddress(number, Device.MASTER_ID));
|
||||
pendingAccounts.remove(number);
|
||||
|
||||
logger.debug("Stored device...");
|
||||
createAccount(number, password, accountAttributes);
|
||||
} catch (InvalidAuthorizationHeaderException e) {
|
||||
logger.info("Bad Authorization Header", e);
|
||||
throw new WebApplicationException(Response.status(401).build());
|
||||
}
|
||||
}
|
||||
|
||||
@Timed
|
||||
@PUT
|
||||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
@Path("/token/{verification_token}")
|
||||
public void verifyToken(@PathParam("verification_token") String verificationToken,
|
||||
@HeaderParam("Authorization") String authorizationHeader,
|
||||
@Valid AccountAttributes accountAttributes)
|
||||
throws RateLimitExceededException
|
||||
{
|
||||
try {
|
||||
AuthorizationHeader header = AuthorizationHeader.fromFullHeader(authorizationHeader);
|
||||
String number = header.getNumber();
|
||||
String password = header.getPassword();
|
||||
|
||||
rateLimiters.getVerifyLimiter().validate(number);
|
||||
|
||||
if (!authorizationKey.isPresent()) {
|
||||
logger.debug("Attempt to authorize with key but not configured...");
|
||||
throw new WebApplicationException(Response.status(403).build());
|
||||
}
|
||||
|
||||
AuthorizationToken token = new AuthorizationToken(verificationToken, authorizationKey.get());
|
||||
|
||||
if (!token.isValid(number, timeProvider.getCurrentTimeMillis())) {
|
||||
throw new WebApplicationException(Response.status(403).build());
|
||||
}
|
||||
|
||||
createAccount(number, password, accountAttributes);
|
||||
} catch (InvalidAuthorizationHeaderException e) {
|
||||
logger.info("Bad authorization header", e);
|
||||
throw new WebApplicationException(Response.status(401).build());
|
||||
}
|
||||
}
|
||||
|
||||
@Timed
|
||||
@PUT
|
||||
@@ -219,6 +243,26 @@ public class AccountController {
|
||||
encodedVerificationText)).build();
|
||||
}
|
||||
|
||||
private void createAccount(String number, String password, AccountAttributes accountAttributes) {
|
||||
Device device = new Device();
|
||||
device.setId(Device.MASTER_ID);
|
||||
device.setAuthenticationCredentials(new AuthenticationCredentials(password));
|
||||
device.setSignalingKey(accountAttributes.getSignalingKey());
|
||||
device.setFetchesMessages(accountAttributes.getFetchesMessages());
|
||||
device.setRegistrationId(accountAttributes.getRegistrationId());
|
||||
|
||||
Account account = new Account();
|
||||
account.setNumber(number);
|
||||
account.setSupportsSms(accountAttributes.getSupportsSms());
|
||||
account.addDevice(device);
|
||||
|
||||
accounts.create(account);
|
||||
storedMessages.clear(new WebsocketAddress(number, Device.MASTER_ID));
|
||||
pendingAccounts.remove(number);
|
||||
|
||||
logger.debug("Stored device...");
|
||||
}
|
||||
|
||||
@VisibleForTesting protected VerificationCode generateVerificationCode() {
|
||||
try {
|
||||
SecureRandom random = SecureRandom.getInstance("SHA1PRNG");
|
||||
|
||||
@@ -1,263 +0,0 @@
|
||||
package org.whispersystems.textsecuregcm.controllers;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import org.eclipse.jetty.websocket.api.CloseStatus;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.api.WebSocketListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
|
||||
import org.whispersystems.textsecuregcm.entities.AcknowledgeWebsocketMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.IncomingWebsocketMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.PendingMessage;
|
||||
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubListener;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubMessage;
|
||||
import org.whispersystems.textsecuregcm.storage.StoredMessages;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebsocketMessage;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import io.dropwizard.auth.AuthenticationException;
|
||||
import io.dropwizard.auth.basic.BasicCredentials;
|
||||
import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
|
||||
|
||||
public class WebsocketController implements WebSocketListener, PubSubListener {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(WebsocketController.class);
|
||||
private static final ObjectMapper mapper = SystemMapper.getMapper();
|
||||
private static final Map<Long, PendingMessage> pendingMessages = new HashMap<>();
|
||||
|
||||
private final AccountAuthenticator accountAuthenticator;
|
||||
private final AccountsManager accountsManager;
|
||||
private final PubSubManager pubSubManager;
|
||||
private final StoredMessages storedMessages;
|
||||
private final PushSender pushSender;
|
||||
|
||||
private WebsocketAddress address;
|
||||
private Account account;
|
||||
private Device device;
|
||||
private Session session;
|
||||
|
||||
private long pendingMessageSequence;
|
||||
|
||||
public WebsocketController(AccountAuthenticator accountAuthenticator,
|
||||
AccountsManager accountsManager,
|
||||
PushSender pushSender,
|
||||
PubSubManager pubSubManager,
|
||||
StoredMessages storedMessages)
|
||||
{
|
||||
this.accountAuthenticator = accountAuthenticator;
|
||||
this.accountsManager = accountsManager;
|
||||
this.pushSender = pushSender;
|
||||
this.pubSubManager = pubSubManager;
|
||||
this.storedMessages = storedMessages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(Session session) {
|
||||
try {
|
||||
UpgradeRequest request = session.getUpgradeRequest();
|
||||
Map<String, String[]> parameters = request.getParameterMap();
|
||||
String[] usernames = parameters.get("login" );
|
||||
String[] passwords = parameters.get("password");
|
||||
|
||||
if (usernames == null || usernames.length == 0 ||
|
||||
passwords == null || passwords.length == 0)
|
||||
{
|
||||
session.close(new CloseStatus(4001, "Unauthorized"));
|
||||
return;
|
||||
}
|
||||
|
||||
BasicCredentials credentials = new BasicCredentials(usernames[0], passwords[0]);
|
||||
Optional<Account> account = accountAuthenticator.authenticate(credentials);
|
||||
|
||||
if (!account.isPresent()) {
|
||||
session.close(new CloseStatus(4001, "Unauthorized"));
|
||||
return;
|
||||
}
|
||||
|
||||
this.account = account.get();
|
||||
this.device = account.get().getAuthenticatedDevice().get();
|
||||
this.address = new WebsocketAddress(this.account.getNumber(), this.device.getId());
|
||||
this.session = session;
|
||||
|
||||
this.session.setIdleTimeout(10 * 60 * 1000);
|
||||
this.pubSubManager.subscribe(this.address, this);
|
||||
|
||||
handleQueryDatabase();
|
||||
} catch (AuthenticationException e) {
|
||||
try { session.close(1011, "Server Error");} catch (IOException e1) {}
|
||||
} catch (IOException ioe) {
|
||||
logger.info("Abrupt session close.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketText(String body) {
|
||||
try {
|
||||
IncomingWebsocketMessage incomingMessage = mapper.readValue(body, IncomingWebsocketMessage.class);
|
||||
|
||||
switch (incomingMessage.getType()) {
|
||||
case IncomingWebsocketMessage.TYPE_ACKNOWLEDGE_MESSAGE:
|
||||
handleMessageAck(body);
|
||||
break;
|
||||
default:
|
||||
close(new CloseStatus(1008, "Unknown Type"));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.debug("Parse", e);
|
||||
close(new CloseStatus(1008, "Badly Formatted"));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketClose(int i, String s) {
|
||||
pubSubManager.unsubscribe(this.address, this);
|
||||
|
||||
List<PendingMessage> remainingMessages = new LinkedList<>();
|
||||
|
||||
synchronized (pendingMessages) {
|
||||
Long[] pendingKeys = pendingMessages.keySet().toArray(new Long[0]);
|
||||
Arrays.sort(pendingKeys);
|
||||
|
||||
for (long pendingKey : pendingKeys) {
|
||||
remainingMessages.add(pendingMessages.get(pendingKey));
|
||||
}
|
||||
|
||||
pendingMessages.clear();
|
||||
}
|
||||
|
||||
for (PendingMessage remainingMessage : remainingMessages) {
|
||||
try {
|
||||
pushSender.sendMessage(account, device, remainingMessage);
|
||||
} catch (NotPushRegisteredException | TransientPushFailureException e) {
|
||||
logger.warn("onWebSocketClose", e);
|
||||
storedMessages.insert(address, remainingMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPubSubMessage(PubSubMessage outgoingMessage) {
|
||||
switch (outgoingMessage.getType()) {
|
||||
case PubSubMessage.TYPE_DELIVER:
|
||||
try {
|
||||
PendingMessage pendingMessage = mapper.readValue(outgoingMessage.getContents(), PendingMessage.class);
|
||||
handleDeliverOutgoingMessage(pendingMessage);
|
||||
} catch (IOException e) {
|
||||
logger.warn("WebsocketController", "Error deserializing PendingMessage", e);
|
||||
}
|
||||
break;
|
||||
case PubSubMessage.TYPE_QUERY_DB:
|
||||
handleQueryDatabase();
|
||||
break;
|
||||
default:
|
||||
logger.warn("Unknown pubsub message: " + outgoingMessage.getType());
|
||||
}
|
||||
}
|
||||
|
||||
private void handleDeliverOutgoingMessage(PendingMessage message) {
|
||||
try {
|
||||
long messageSequence;
|
||||
|
||||
synchronized (pendingMessages) {
|
||||
messageSequence = pendingMessageSequence++;
|
||||
pendingMessages.put(messageSequence, message);
|
||||
}
|
||||
|
||||
WebsocketMessage websocketMessage = new WebsocketMessage(messageSequence, message.getEncryptedOutgoingMessage());
|
||||
session.getRemote().sendStringByFuture(mapper.writeValueAsString(websocketMessage));
|
||||
} catch (IOException e) {
|
||||
logger.debug("Response failed", e);
|
||||
close(null);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleMessageAck(String message) {
|
||||
try {
|
||||
AcknowledgeWebsocketMessage ack = mapper.readValue(message, AcknowledgeWebsocketMessage.class);
|
||||
PendingMessage acknowledgedMessage;
|
||||
|
||||
synchronized (pendingMessages) {
|
||||
acknowledgedMessage = pendingMessages.remove(ack.getId());
|
||||
}
|
||||
|
||||
if (acknowledgedMessage != null && !acknowledgedMessage.isReceipt()) {
|
||||
sendDeliveryReceipt(acknowledgedMessage);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
logger.warn("Mapping", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleQueryDatabase() {
|
||||
List<PendingMessage> messages = storedMessages.getMessagesForDevice(address);
|
||||
|
||||
for (PendingMessage message : messages) {
|
||||
handleDeliverOutgoingMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendDeliveryReceipt(PendingMessage acknowledgedMessage) {
|
||||
try {
|
||||
Optional<Account> source = accountsManager.get(acknowledgedMessage.getSender());
|
||||
|
||||
if (!source.isPresent()) {
|
||||
logger.warn("Source account disappeared? (%s)", acknowledgedMessage.getSender());
|
||||
return;
|
||||
}
|
||||
|
||||
OutgoingMessageSignal.Builder receipt =
|
||||
OutgoingMessageSignal.newBuilder()
|
||||
.setSource(account.getNumber())
|
||||
.setSourceDevice((int) device.getId())
|
||||
.setTimestamp(acknowledgedMessage.getMessageId())
|
||||
.setType(OutgoingMessageSignal.Type.RECEIPT_VALUE);
|
||||
|
||||
for (Device device : source.get().getDevices()) {
|
||||
pushSender.sendMessage(source.get(), device, receipt.build());
|
||||
}
|
||||
} catch (NotPushRegisteredException | TransientPushFailureException e) {
|
||||
logger.warn("Websocket", "Delivery receipet", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketBinary(byte[] bytes, int i, int i2) {
|
||||
logger.info("Received binary message!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketError(Throwable throwable) {
|
||||
logger.info("onWebSocketError", throwable);
|
||||
}
|
||||
|
||||
|
||||
private void close(CloseStatus closeStatus) {
|
||||
try {
|
||||
if (this.session != null) {
|
||||
if (closeStatus != null) this.session.close(closeStatus);
|
||||
else this.session.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.info("close()", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package org.whispersystems.textsecuregcm.entities;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.hibernate.validator.constraints.NotEmpty;
|
||||
|
||||
import javax.validation.constraints.Min;
|
||||
|
||||
public class ApnMessage {
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String apnId;
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String number;
|
||||
|
||||
@JsonProperty
|
||||
@Min(1)
|
||||
private int deviceId;
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String message;
|
||||
|
||||
public ApnMessage() {}
|
||||
|
||||
public ApnMessage(String apnId, String number, int deviceId, String message) {
|
||||
this.apnId = apnId;
|
||||
this.number = number;
|
||||
this.deviceId = deviceId;
|
||||
this.message = message;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package org.whispersystems.textsecuregcm.entities;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.hibernate.validator.constraints.NotEmpty;
|
||||
|
||||
import javax.validation.constraints.Min;
|
||||
|
||||
public class GcmMessage {
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String gcmId;
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String number;
|
||||
|
||||
@JsonProperty
|
||||
@Min(1)
|
||||
private int deviceId;
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String message;
|
||||
|
||||
@JsonProperty
|
||||
private boolean receipt;
|
||||
|
||||
public GcmMessage() {}
|
||||
|
||||
public GcmMessage(String gcmId, String number, int deviceId, String message, boolean receipt) {
|
||||
this.gcmId = gcmId;
|
||||
this.number = number;
|
||||
this.deviceId = deviceId;
|
||||
this.message = message;
|
||||
this.receipt = receipt;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package org.whispersystems.textsecuregcm.entities;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.hibernate.validator.constraints.NotEmpty;
|
||||
|
||||
import javax.validation.constraints.Min;
|
||||
|
||||
public class UnregisteredEvent {
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String registrationId;
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String number;
|
||||
|
||||
@JsonProperty
|
||||
@Min(1)
|
||||
private int deviceId;
|
||||
|
||||
@JsonProperty
|
||||
private long timestamp;
|
||||
|
||||
public String getRegistrationId() {
|
||||
return registrationId;
|
||||
}
|
||||
|
||||
public String getNumber() {
|
||||
return number;
|
||||
}
|
||||
|
||||
public int getDeviceId() {
|
||||
return deviceId;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package org.whispersystems.textsecuregcm.entities;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
public class UnregisteredEventList {
|
||||
|
||||
@JsonProperty
|
||||
private List<UnregisteredEvent> devices;
|
||||
|
||||
public List<UnregisteredEvent> getDevices() {
|
||||
if (devices == null) return new LinkedList<>();
|
||||
else return devices;
|
||||
}
|
||||
}
|
||||
@@ -56,7 +56,7 @@ public class JsonMetricsReporter extends ScheduledReporter {
|
||||
SortedMap<String, Timer> stringTimerSortedMap)
|
||||
{
|
||||
try {
|
||||
logger.info("Reporting metrics...");
|
||||
logger.debug("Reporting metrics...");
|
||||
URL url = new URL("https", sunnylabsHost, 443, "/report/metrics?t=" + table + "&h=" + host);
|
||||
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
|
||||
|
||||
@@ -93,7 +93,7 @@ public class JsonMetricsReporter extends ScheduledReporter {
|
||||
|
||||
outputStream.close();
|
||||
|
||||
logger.info("Metrics server response: " + connection.getResponseCode());
|
||||
logger.debug("Metrics server response: " + connection.getResponseCode());
|
||||
} catch (IOException e) {
|
||||
logger.warn("Error sending metrics", e);
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
package org.whispersystems.textsecuregcm.providers;
|
||||
|
||||
public class TimeProvider {
|
||||
public long getCurrentTimeMillis() {
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
@@ -1,248 +0,0 @@
|
||||
/**
|
||||
* Copyright (C) 2013 Open WhisperSystems
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import com.notnoop.apns.APNS;
|
||||
import com.notnoop.apns.ApnsService;
|
||||
import com.notnoop.exceptions.NetworkIOException;
|
||||
import net.spy.memcached.MemcachedClient;
|
||||
import org.bouncycastle.openssl.PEMReader;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.entities.PendingMessage;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubMessage;
|
||||
import org.whispersystems.textsecuregcm.storage.StoredMessages;
|
||||
import org.whispersystems.textsecuregcm.util.Constants;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.security.KeyPair;
|
||||
import java.security.KeyStore;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.cert.Certificate;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
|
||||
public class APNSender implements Managed {
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Meter websocketMeter = metricRegistry.meter(name(getClass(), "websocket"));
|
||||
private final Meter pushMeter = metricRegistry.meter(name(getClass(), "push"));
|
||||
private final Meter failureMeter = metricRegistry.meter(name(getClass(), "failure"));
|
||||
private final Logger logger = LoggerFactory.getLogger(APNSender.class);
|
||||
|
||||
private static final String MESSAGE_BODY = "m";
|
||||
|
||||
private static final ObjectMapper mapper = SystemMapper.getMapper();
|
||||
|
||||
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
private final AccountsManager accounts;
|
||||
private final PubSubManager pubSubManager;
|
||||
private final StoredMessages storedMessages;
|
||||
private final MemcachedClient memcachedClient;
|
||||
|
||||
private final String apnCertificate;
|
||||
private final String apnKey;
|
||||
|
||||
private Optional<ApnsService> apnService;
|
||||
|
||||
public APNSender(AccountsManager accounts,
|
||||
PubSubManager pubSubManager,
|
||||
StoredMessages storedMessages,
|
||||
MemcachedClient memcachedClient,
|
||||
String apnCertificate, String apnKey)
|
||||
{
|
||||
this.accounts = accounts;
|
||||
this.pubSubManager = pubSubManager;
|
||||
this.storedMessages = storedMessages;
|
||||
this.apnCertificate = apnCertificate;
|
||||
this.apnKey = apnKey;
|
||||
this.memcachedClient = memcachedClient;
|
||||
}
|
||||
|
||||
public void sendMessage(Account account, Device device,
|
||||
String registrationId, PendingMessage message)
|
||||
throws TransientPushFailureException
|
||||
{
|
||||
try {
|
||||
String serializedPendingMessage = mapper.writeValueAsString(message);
|
||||
WebsocketAddress websocketAddress = new WebsocketAddress(account.getNumber(), device.getId());
|
||||
|
||||
if (pubSubManager.publish(websocketAddress, new PubSubMessage(PubSubMessage.TYPE_DELIVER,
|
||||
serializedPendingMessage)))
|
||||
{
|
||||
websocketMeter.mark();
|
||||
} else {
|
||||
memcacheSet(registrationId, account.getNumber());
|
||||
storedMessages.insert(websocketAddress, message);
|
||||
|
||||
if (!message.isReceipt()) {
|
||||
sendPush(registrationId, serializedPendingMessage);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new TransientPushFailureException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendPush(String registrationId, String message)
|
||||
throws TransientPushFailureException
|
||||
{
|
||||
try {
|
||||
if (!apnService.isPresent()) {
|
||||
failureMeter.mark();
|
||||
throw new TransientPushFailureException("APN access not configured!");
|
||||
}
|
||||
|
||||
String payload = APNS.newPayload()
|
||||
.alertBody("Message!")
|
||||
.customField(MESSAGE_BODY, message)
|
||||
.build();
|
||||
|
||||
logger.debug("APN Payload: " + payload);
|
||||
|
||||
apnService.get().push(registrationId, payload);
|
||||
pushMeter.mark();
|
||||
} catch (NetworkIOException nioe) {
|
||||
logger.warn("Network Error", nioe);
|
||||
failureMeter.mark();
|
||||
throw new TransientPushFailureException(nioe);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static byte[] initializeKeyStore(String pemCertificate, String pemKey)
|
||||
throws KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException
|
||||
{
|
||||
PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemCertificate.getBytes())));
|
||||
X509Certificate certificate = (X509Certificate) reader.readObject();
|
||||
Certificate[] certificateChain = {certificate};
|
||||
|
||||
reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(pemKey.getBytes())));
|
||||
KeyPair keyPair = (KeyPair) reader.readObject();
|
||||
|
||||
KeyStore keyStore = KeyStore.getInstance("pkcs12");
|
||||
keyStore.load(null);
|
||||
keyStore.setEntry("apn",
|
||||
new KeyStore.PrivateKeyEntry(keyPair.getPrivate(), certificateChain),
|
||||
new KeyStore.PasswordProtection("insecure".toCharArray()));
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
keyStore.store(baos, "insecure".toCharArray());
|
||||
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
if (!Util.isEmpty(apnCertificate) && !Util.isEmpty(apnKey)) {
|
||||
byte[] keyStore = initializeKeyStore(apnCertificate, apnKey);
|
||||
|
||||
this.apnService = Optional.of(APNS.newService()
|
||||
.withCert(new ByteArrayInputStream(keyStore), "insecure")
|
||||
.asQueued()
|
||||
.withSandboxDestination().build());
|
||||
|
||||
this.executor.scheduleAtFixedRate(new FeedbackRunnable(), 0, 1, TimeUnit.HOURS);
|
||||
} else {
|
||||
this.apnService = Optional.absent();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
if (apnService.isPresent()) {
|
||||
apnService.get().stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void memcacheSet(String registrationId, String number) {
|
||||
if (memcachedClient != null) {
|
||||
memcachedClient.set("APN-" + registrationId, 60 * 60 * 24, number);
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<String> memcacheGet(String registrationId) {
|
||||
if (memcachedClient != null) {
|
||||
return Optional.fromNullable((String)memcachedClient.get("APN-" + registrationId));
|
||||
} else {
|
||||
return Optional.absent();
|
||||
}
|
||||
}
|
||||
|
||||
private class FeedbackRunnable implements Runnable {
|
||||
private void updateAccount(Account account, String registrationId) {
|
||||
boolean needsUpdate = false;
|
||||
|
||||
for (Device device : account.getDevices()) {
|
||||
if (registrationId.equals(device.getApnId())) {
|
||||
needsUpdate = true;
|
||||
device.setApnId(null);
|
||||
}
|
||||
}
|
||||
|
||||
if (needsUpdate) {
|
||||
accounts.update(account);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (apnService.isPresent()) {
|
||||
Map<String, Date> inactiveDevices = apnService.get().getInactiveDevices();
|
||||
|
||||
for (String registrationId : inactiveDevices.keySet()) {
|
||||
Optional<String> number = memcacheGet(registrationId);
|
||||
|
||||
if (number.isPresent()) {
|
||||
Optional<Account> account = accounts.get(number.get());
|
||||
|
||||
if (account.isPresent()) {
|
||||
updateAccount(account.get(), registrationId);
|
||||
}
|
||||
} else {
|
||||
logger.warn("APN unregister event received for uncached ID: " + registrationId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.entities.UnregisteredEvent;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
|
||||
public class FeedbackHandler implements Managed, Runnable {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(PushServiceClient.class);
|
||||
|
||||
private final PushServiceClient client;
|
||||
private final AccountsManager accountsManager;
|
||||
|
||||
private ScheduledExecutorService executor;
|
||||
|
||||
public FeedbackHandler(PushServiceClient client, AccountsManager accountsManager) {
|
||||
this.client = client;
|
||||
this.accountsManager = accountsManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
this.executor = Executors.newSingleThreadScheduledExecutor();
|
||||
this.executor.scheduleAtFixedRate(this, 0, 1, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
if (this.executor != null) {
|
||||
this.executor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
List<UnregisteredEvent> gcmFeedback = client.getGcmFeedback();
|
||||
List<UnregisteredEvent> apnFeedback = client.getApnFeedback();
|
||||
|
||||
for (UnregisteredEvent gcmEvent : gcmFeedback) {
|
||||
handleGcmUnregistered(gcmEvent);
|
||||
}
|
||||
|
||||
for (UnregisteredEvent apnEvent : apnFeedback) {
|
||||
handleApnUnregistered(apnEvent);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("Error retrieving feedback: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleGcmUnregistered(UnregisteredEvent event) {
|
||||
logger.info("Got GCM Unregistered: " + event.getNumber() + "," + event.getDeviceId());
|
||||
|
||||
Optional<Account> account = accountsManager.get(event.getNumber());
|
||||
|
||||
if (account.isPresent()) {
|
||||
Optional<Device> device = account.get().getDevice(event.getDeviceId());
|
||||
|
||||
if (device.isPresent()) {
|
||||
if (event.getRegistrationId().equals(device.get().getGcmId())) {
|
||||
logger.info("GCM Unregister GCM ID matches!");
|
||||
if (device.get().getPushTimestamp() == 0 ||
|
||||
event.getTimestamp() > device.get().getPushTimestamp())
|
||||
{
|
||||
logger.info("GCM Unregister Timestamp matches!");
|
||||
device.get().setGcmId(null);
|
||||
accountsManager.update(account.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleApnUnregistered(UnregisteredEvent event) {
|
||||
logger.info("Got APN Unregistered: " + event.getNumber() + "," + event.getDeviceId());
|
||||
|
||||
Optional<Account> account = accountsManager.get(event.getNumber());
|
||||
|
||||
if (account.isPresent()) {
|
||||
Optional<Device> device = account.get().getDevice(event.getDeviceId());
|
||||
|
||||
if (device.isPresent()) {
|
||||
if (event.getRegistrationId().equals(device.get().getApnId())) {
|
||||
logger.info("APN Unregister APN ID matches!");
|
||||
if (device.get().getPushTimestamp() == 0 ||
|
||||
event.getTimestamp() > device.get().getPushTimestamp())
|
||||
{
|
||||
logger.info("APN Unregister timestamp matches!");
|
||||
device.get().setApnId(null);
|
||||
accountsManager.update(account.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,424 +0,0 @@
|
||||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.google.common.base.Optional;
|
||||
import org.jivesoftware.smack.ConnectionConfiguration;
|
||||
import org.jivesoftware.smack.ConnectionListener;
|
||||
import org.jivesoftware.smack.PacketListener;
|
||||
import org.jivesoftware.smack.SmackException;
|
||||
import org.jivesoftware.smack.XMPPConnection;
|
||||
import org.jivesoftware.smack.XMPPException;
|
||||
import org.jivesoftware.smack.filter.PacketTypeFilter;
|
||||
import org.jivesoftware.smack.packet.DefaultPacketExtension;
|
||||
import org.jivesoftware.smack.packet.Message;
|
||||
import org.jivesoftware.smack.packet.Packet;
|
||||
import org.jivesoftware.smack.packet.PacketExtension;
|
||||
import org.jivesoftware.smack.provider.PacketExtensionProvider;
|
||||
import org.jivesoftware.smack.provider.ProviderManager;
|
||||
import org.jivesoftware.smack.tcp.XMPPTCPConnection;
|
||||
import org.jivesoftware.smack.util.StringUtils;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.json.simple.JSONValue;
|
||||
import org.json.simple.parser.ParseException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.entities.PendingMessage;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import org.xmlpull.v1.XmlPullParser;
|
||||
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
import io.dropwizard.lifecycle.Managed;
|
||||
|
||||
public class GCMSender implements Managed, PacketListener {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(GCMSender.class);
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(org.whispersystems.textsecuregcm.util.Constants.METRICS_NAME);
|
||||
private final Meter success = metricRegistry.meter(name(getClass(), "sent", "success"));
|
||||
private final Meter failure = metricRegistry.meter(name(getClass(), "sent", "failure"));
|
||||
private final Meter unregistered = metricRegistry.meter(name(getClass(), "sent", "unregistered"));
|
||||
|
||||
private static final String GCM_SERVER = "gcm.googleapis.com";
|
||||
private static final int GCM_PORT = 5235;
|
||||
|
||||
private static final String GCM_ELEMENT_NAME = "gcm";
|
||||
private static final String GCM_NAMESPACE = "google:mobile:data";
|
||||
|
||||
private final Map<String, UnacknowledgedMessage> pendingMessages = new ConcurrentHashMap<>();
|
||||
|
||||
private final long senderId;
|
||||
private final String apiKey;
|
||||
private final AccountsManager accounts;
|
||||
|
||||
private XMPPTCPConnection connection;
|
||||
|
||||
public GCMSender(AccountsManager accounts, long senderId, String apiKey) {
|
||||
this.accounts = accounts;
|
||||
this.senderId = senderId;
|
||||
this.apiKey = apiKey;
|
||||
|
||||
ProviderManager.addExtensionProvider(GCM_ELEMENT_NAME, GCM_NAMESPACE,
|
||||
new GcmPacketExtensionProvider());
|
||||
}
|
||||
|
||||
public void sendMessage(String destinationNumber, long destinationDeviceId,
|
||||
String registrationId, PendingMessage message)
|
||||
{
|
||||
String messageId = "m-" + UUID.randomUUID().toString();
|
||||
UnacknowledgedMessage unacknowledgedMessage = new UnacknowledgedMessage(destinationNumber,
|
||||
destinationDeviceId,
|
||||
registrationId, message);
|
||||
|
||||
sendMessage(messageId, unacknowledgedMessage);
|
||||
}
|
||||
|
||||
public void sendMessage(String messageId, UnacknowledgedMessage message) {
|
||||
try {
|
||||
boolean isReceipt = message.getPendingMessage().isReceipt();
|
||||
|
||||
Map<String, String> dataObject = new HashMap<>();
|
||||
dataObject.put("type", "message");
|
||||
dataObject.put(isReceipt ? "receipt" : "message", message.getPendingMessage().getEncryptedOutgoingMessage());
|
||||
|
||||
Map<String, Object> messageObject = new HashMap<>();
|
||||
messageObject.put("to", message.getRegistrationId());
|
||||
messageObject.put("message_id", messageId);
|
||||
messageObject.put("data", dataObject);
|
||||
|
||||
String json = JSONObject.toJSONString(messageObject);
|
||||
|
||||
pendingMessages.put(messageId, message);
|
||||
connection.sendPacket(new GcmPacketExtension(json).toPacket());
|
||||
} catch (SmackException.NotConnectedException e) {
|
||||
logger.warn("GCMClient", "No connection", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
this.connection = connect(senderId, apiKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
this.connection.disconnect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processPacket(Packet packet) throws SmackException.NotConnectedException {
|
||||
Message incomingMessage = (Message) packet;
|
||||
GcmPacketExtension gcmPacket = (GcmPacketExtension) incomingMessage.getExtension(GCM_NAMESPACE);
|
||||
String json = gcmPacket.getJson();
|
||||
|
||||
try {
|
||||
Map<String, Object> jsonObject = (Map<String, Object>) JSONValue.parseWithException(json);
|
||||
Object messageType = jsonObject.get("message_type");
|
||||
|
||||
if (messageType == null) {
|
||||
handleUpstreamMessage(jsonObject);
|
||||
return;
|
||||
}
|
||||
|
||||
switch (messageType.toString()) {
|
||||
case "ack" : handleAckReceipt(jsonObject); break;
|
||||
case "nack" : handleNackReceipt(jsonObject); break;
|
||||
case "receipt" : handleDeliveryReceipt(jsonObject); break;
|
||||
case "control" : handleControlMessage(jsonObject); break;
|
||||
default:
|
||||
logger.warn("Received unknown GCM message: " + messageType.toString());
|
||||
}
|
||||
|
||||
} catch (ParseException e) {
|
||||
logger.warn("GCMClient", "Received unparsable message", e);
|
||||
} catch (Exception e) {
|
||||
logger.warn("GCMClient", "Failed to process packet", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleControlMessage(Map<String, Object> message) {
|
||||
String controlType = (String) message.get("control_type");
|
||||
|
||||
if ("CONNECTION_DRAINING".equals(controlType)) {
|
||||
logger.warn("GCM Connection is draining! Initiating reconnect...");
|
||||
reconnect();
|
||||
} else {
|
||||
logger.warn("Received unknown GCM control message: " + controlType);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleDeliveryReceipt(Map<String, Object> message) {
|
||||
logger.warn("Got delivery receipt!");
|
||||
}
|
||||
|
||||
private void handleNackReceipt(Map<String, Object> message) {
|
||||
String messageId = (String) message.get("message_id");
|
||||
String errorCode = (String) message.get("error");
|
||||
|
||||
if (errorCode == null) {
|
||||
logger.warn("Null GCM error code!");
|
||||
if (messageId != null) {
|
||||
pendingMessages.remove(messageId);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
switch (errorCode) {
|
||||
case "BAD_REGISTRATION" : handleBadRegistration(message); break;
|
||||
case "DEVICE_UNREGISTERED" : handleBadRegistration(message); break;
|
||||
case "INTERNAL_SERVER_ERROR" : handleServerFailure(message); break;
|
||||
case "INVALID_JSON" : handleClientFailure(message); break;
|
||||
case "QUOTA_EXCEEDED" : handleClientFailure(message); break;
|
||||
case "SERVICE_UNAVAILABLE" : handleServerFailure(message); break;
|
||||
}
|
||||
}
|
||||
|
||||
private void handleAckReceipt(Map<String, Object> message) {
|
||||
success.mark();
|
||||
|
||||
String messageId = (String) message.get("message_id");
|
||||
|
||||
if (messageId != null) {
|
||||
pendingMessages.remove(messageId);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleUpstreamMessage(Map<String, Object> message)
|
||||
throws SmackException.NotConnectedException
|
||||
{
|
||||
logger.warn("Got upstream message from GCM Server!");
|
||||
|
||||
for (String key : message.keySet()) {
|
||||
logger.warn(key + " : " + message.get(key));
|
||||
}
|
||||
|
||||
Map<String, Object> ack = new HashMap<>();
|
||||
message.put("message_type", "ack");
|
||||
message.put("to", message.get("from"));
|
||||
message.put("message_id", message.get("message_id"));
|
||||
|
||||
String json = JSONValue.toJSONString(ack);
|
||||
|
||||
Packet request = new GcmPacketExtension(json).toPacket();
|
||||
connection.sendPacket(request);
|
||||
}
|
||||
|
||||
private void handleBadRegistration(Map<String, Object> message) {
|
||||
unregistered.mark();
|
||||
|
||||
String messageId = (String) message.get("message_id");
|
||||
|
||||
if (messageId != null) {
|
||||
UnacknowledgedMessage unacknowledgedMessage = pendingMessages.remove(messageId);
|
||||
|
||||
if (unacknowledgedMessage != null) {
|
||||
Optional<Account> account = accounts.get(unacknowledgedMessage.getDestinationNumber());
|
||||
|
||||
if (account.isPresent()) {
|
||||
Optional<Device> device = account.get().getDevice(unacknowledgedMessage.getDestinationDeviceId());
|
||||
|
||||
if (device.isPresent()) {
|
||||
device.get().setGcmId(null);
|
||||
accounts.update(account.get());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleServerFailure(Map<String, Object> message) {
|
||||
failure.mark();
|
||||
|
||||
String messageId = (String)message.get("message_id");
|
||||
|
||||
if (messageId != null) {
|
||||
UnacknowledgedMessage unacknowledgedMessage = pendingMessages.remove(messageId);
|
||||
|
||||
if (unacknowledgedMessage != null) {
|
||||
sendMessage(messageId, unacknowledgedMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleClientFailure(Map<String, Object> message) {
|
||||
failure.mark();
|
||||
|
||||
logger.warn("Unrecoverable error: " + message.get("error"));
|
||||
String messageId = (String)message.get("message_id");
|
||||
|
||||
if (messageId != null) {
|
||||
pendingMessages.remove(messageId);
|
||||
}
|
||||
}
|
||||
|
||||
private void reconnect() {
|
||||
try {
|
||||
this.connection.disconnect();
|
||||
} catch (SmackException.NotConnectedException e) {
|
||||
logger.warn("GCMClient", "Disconnect attempt", e);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
this.connection = connect(senderId, apiKey);
|
||||
return;
|
||||
} catch (XMPPException | IOException | SmackException e) {
|
||||
logger.warn("GCMClient", "Reconnecting", e);
|
||||
Util.sleep(1000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private XMPPTCPConnection connect(long senderId, String apiKey)
|
||||
throws XMPPException, IOException, SmackException
|
||||
{
|
||||
ConnectionConfiguration config = new ConnectionConfiguration(GCM_SERVER, GCM_PORT);
|
||||
config.setSecurityMode(ConnectionConfiguration.SecurityMode.enabled);
|
||||
config.setReconnectionAllowed(true);
|
||||
config.setRosterLoadedAtLogin(false);
|
||||
config.setSendPresence(false);
|
||||
config.setSocketFactory(SSLSocketFactory.getDefault());
|
||||
|
||||
XMPPTCPConnection connection = new XMPPTCPConnection(config);
|
||||
connection.connect();
|
||||
|
||||
connection.addConnectionListener(new LoggingConnectionListener());
|
||||
connection.addPacketListener(this, new PacketTypeFilter(Message.class));
|
||||
|
||||
connection.login(senderId + "@gcm.googleapis.com", apiKey);
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
private static class GcmPacketExtensionProvider implements PacketExtensionProvider {
|
||||
@Override
|
||||
public PacketExtension parseExtension(XmlPullParser xmlPullParser) throws Exception {
|
||||
String json = xmlPullParser.nextText();
|
||||
return new GcmPacketExtension(json);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class GcmPacketExtension extends DefaultPacketExtension {
|
||||
|
||||
private final String json;
|
||||
|
||||
public GcmPacketExtension(String json) {
|
||||
super(GCM_ELEMENT_NAME, GCM_NAMESPACE);
|
||||
this.json = json;
|
||||
}
|
||||
|
||||
public String getJson() {
|
||||
return json;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toXML() {
|
||||
return String.format("<%s xmlns=\"%s\">%s</%s>", GCM_ELEMENT_NAME, GCM_NAMESPACE,
|
||||
StringUtils.escapeForXML(json), GCM_ELEMENT_NAME);
|
||||
}
|
||||
|
||||
public Packet toPacket() {
|
||||
Message message = new Message();
|
||||
message.addExtension(this);
|
||||
return message;
|
||||
}
|
||||
}
|
||||
|
||||
private class LoggingConnectionListener implements ConnectionListener {
|
||||
|
||||
@Override
|
||||
public void connected(XMPPConnection xmppConnection) {
|
||||
logger.warn("GCM XMPP Connected.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void authenticated(XMPPConnection xmppConnection) {
|
||||
logger.warn("GCM XMPP Authenticated.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reconnectionSuccessful() {
|
||||
logger.warn("GCM XMPP Reconnecting..");
|
||||
Iterator<Map.Entry<String, UnacknowledgedMessage>> iterator =
|
||||
pendingMessages.entrySet().iterator();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
Map.Entry<String, UnacknowledgedMessage> entry = iterator.next();
|
||||
iterator.remove();
|
||||
|
||||
sendMessage(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reconnectionFailed(Exception e) {
|
||||
logger.warn("GCM XMPP Reconnection failed!", e);
|
||||
reconnect();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reconnectingIn(int seconds) {
|
||||
logger.warn(String.format("GCM XMPP Reconnecting in %d secs", seconds));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionClosedOnError(Exception e) {
|
||||
logger.warn("GCM XMPP Connection closed on error.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionClosed() {
|
||||
logger.warn("GCM XMPP Connection closed.");
|
||||
reconnect();
|
||||
}
|
||||
}
|
||||
|
||||
private static class UnacknowledgedMessage {
|
||||
private final String destinationNumber;
|
||||
private final long destinationDeviceId;
|
||||
|
||||
private final String registrationId;
|
||||
private final PendingMessage pendingMessage;
|
||||
|
||||
private UnacknowledgedMessage(String destinationNumber,
|
||||
long destinationDeviceId,
|
||||
String registrationId,
|
||||
PendingMessage pendingMessage)
|
||||
{
|
||||
this.destinationNumber = destinationNumber;
|
||||
this.destinationDeviceId = destinationDeviceId;
|
||||
this.registrationId = registrationId;
|
||||
this.pendingMessage = pendingMessage;
|
||||
}
|
||||
|
||||
private String getRegistrationId() {
|
||||
return registrationId;
|
||||
}
|
||||
|
||||
private PendingMessage getPendingMessage() {
|
||||
return pendingMessage;
|
||||
}
|
||||
|
||||
public String getDestinationNumber() {
|
||||
return destinationNumber;
|
||||
}
|
||||
|
||||
public long getDestinationDeviceId() {
|
||||
return destinationDeviceId;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -18,9 +18,10 @@ package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.entities.ApnMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.CryptoEncodingException;
|
||||
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.entities.GcmMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.PendingMessage;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
@@ -31,17 +32,14 @@ public class PushSender {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(PushSender.class);
|
||||
|
||||
private final GCMSender gcmSender;
|
||||
private final APNSender apnSender;
|
||||
private final WebsocketSender webSocketSender;
|
||||
private static final String APN_PAYLOAD = "{\"aps\":{\"sound\":\"default\",\"alert\":{\"loc-key\":\"APN_Message\"},\"content-available\":1,\"category\":\"Signal_Message\"}}";
|
||||
|
||||
public PushSender(GCMSender gcmClient,
|
||||
APNSender apnSender,
|
||||
WebsocketSender websocketSender)
|
||||
{
|
||||
this.gcmSender = gcmClient;
|
||||
this.apnSender = apnSender;
|
||||
this.webSocketSender = websocketSender;
|
||||
private final PushServiceClient pushServiceClient;
|
||||
private final WebsocketSender webSocketSender;
|
||||
|
||||
public PushSender(PushServiceClient pushServiceClient, WebsocketSender websocketSender) {
|
||||
this.pushServiceClient = pushServiceClient;
|
||||
this.webSocketSender = websocketSender;
|
||||
}
|
||||
|
||||
public void sendMessage(Account account, Device device, OutgoingMessageSignal message)
|
||||
@@ -71,22 +69,33 @@ public class PushSender {
|
||||
else throw new NotPushRegisteredException("No delivery possible!");
|
||||
}
|
||||
|
||||
private void sendGcmMessage(Account account, Device device, PendingMessage pendingMessage) {
|
||||
String number = account.getNumber();
|
||||
long deviceId = device.getId();
|
||||
String registrationId = device.getGcmId();
|
||||
private void sendGcmMessage(Account account, Device device, PendingMessage pendingMessage)
|
||||
throws TransientPushFailureException
|
||||
{
|
||||
String number = account.getNumber();
|
||||
long deviceId = device.getId();
|
||||
String registrationId = device.getGcmId();
|
||||
GcmMessage gcmMessage = new GcmMessage(registrationId, number, (int)deviceId,
|
||||
pendingMessage.getEncryptedOutgoingMessage(),
|
||||
pendingMessage.isReceipt() );
|
||||
|
||||
gcmSender.sendMessage(number, deviceId, registrationId, pendingMessage);
|
||||
pushServiceClient.send(gcmMessage);
|
||||
}
|
||||
|
||||
private void sendApnMessage(Account account, Device device, PendingMessage outgoingMessage)
|
||||
throws TransientPushFailureException
|
||||
{
|
||||
apnSender.sendMessage(account, device, device.getApnId(), outgoingMessage);
|
||||
boolean online = webSocketSender.sendMessage(account, device, outgoingMessage, true);
|
||||
|
||||
if (!online && !outgoingMessage.isReceipt()) {
|
||||
ApnMessage apnMessage = new ApnMessage(device.getApnId(), account.getNumber(),
|
||||
(int)device.getId(), APN_PAYLOAD);
|
||||
pushServiceClient.send(apnMessage);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendWebSocketMessage(Account account, Device device, PendingMessage outgoingMessage)
|
||||
{
|
||||
webSocketSender.sendMessage(account, device, outgoingMessage);
|
||||
webSocketSender.sendMessage(account, device, outgoingMessage, false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
package org.whispersystems.textsecuregcm.push;
|
||||
|
||||
import com.sun.jersey.api.client.Client;
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.UniformInterfaceException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.configuration.PushConfiguration;
|
||||
import org.whispersystems.textsecuregcm.entities.ApnMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.GcmMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.UnregisteredEvent;
|
||||
import org.whispersystems.textsecuregcm.entities.UnregisteredEventList;
|
||||
import org.whispersystems.textsecuregcm.util.Base64;
|
||||
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class PushServiceClient {
|
||||
|
||||
private static final String PUSH_GCM_PATH = "/api/v1/push/gcm";
|
||||
private static final String PUSH_APN_PATH = "/api/v1/push/apn";
|
||||
|
||||
private static final String APN_FEEDBACK_PATH = "/api/v1/feedback/apn";
|
||||
private static final String GCM_FEEDBACK_PATH = "/api/v1/feedback/gcm";
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(PushServiceClient.class);
|
||||
|
||||
private final Client client;
|
||||
private final String host;
|
||||
private final int port;
|
||||
private final String authorization;
|
||||
|
||||
public PushServiceClient(Client client, PushConfiguration config) {
|
||||
this.client = client;
|
||||
this.host = config.getHost();
|
||||
this.port = config.getPort();
|
||||
this.authorization = getAuthorizationHeader(config.getUsername(), config.getPassword());
|
||||
}
|
||||
|
||||
public void send(GcmMessage message) throws TransientPushFailureException {
|
||||
sendPush(PUSH_GCM_PATH, message);
|
||||
}
|
||||
|
||||
public void send(ApnMessage message) throws TransientPushFailureException {
|
||||
sendPush(PUSH_APN_PATH, message);
|
||||
}
|
||||
|
||||
public List<UnregisteredEvent> getGcmFeedback() throws IOException {
|
||||
return getFeedback(GCM_FEEDBACK_PATH);
|
||||
}
|
||||
|
||||
public List<UnregisteredEvent> getApnFeedback() throws IOException {
|
||||
return getFeedback(APN_FEEDBACK_PATH);
|
||||
}
|
||||
|
||||
private void sendPush(String path, Object entity) throws TransientPushFailureException {
|
||||
try {
|
||||
ClientResponse response = client.resource("http://" + host + ":" + port + path)
|
||||
.header("Authorization", authorization)
|
||||
.entity(entity, MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
if (response.getStatus() != 204 && response.getStatus() != 200) {
|
||||
logger.warn("PushServer response: " + response.getStatus() + " " + response.getStatusInfo().getReasonPhrase());
|
||||
throw new TransientPushFailureException("Bad response: " + response.getStatus());
|
||||
}
|
||||
} catch (UniformInterfaceException | ClientHandlerException e) {
|
||||
logger.warn("Push error: ", e);
|
||||
throw new TransientPushFailureException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<UnregisteredEvent> getFeedback(String path) throws IOException {
|
||||
try {
|
||||
UnregisteredEventList unregisteredEvents = client.resource("http://" + host + ":" + port + path)
|
||||
.header("Authorization", authorization)
|
||||
.get(UnregisteredEventList.class);
|
||||
|
||||
return unregisteredEvents.getDevices();
|
||||
} catch (UniformInterfaceException | ClientHandlerException e) {
|
||||
logger.warn("Request error:", e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private String getAuthorizationHeader(String username, String password) {
|
||||
return "Basic " + Base64.encodeBytes((username + ":" + password).getBytes());
|
||||
}
|
||||
}
|
||||
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.controllers.WebsocketController;
|
||||
import org.whispersystems.textsecuregcm.entities.PendingMessage;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
@@ -38,11 +37,15 @@ import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
public class WebsocketSender {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(WebsocketController.class);
|
||||
private static final Logger logger = LoggerFactory.getLogger(WebsocketSender.class);
|
||||
|
||||
private final MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
|
||||
private final Meter onlineMeter = metricRegistry.meter(name(getClass(), "online"));
|
||||
private final Meter offlineMeter = metricRegistry.meter(name(getClass(), "offline"));
|
||||
|
||||
private final Meter websocketOnlineMeter = metricRegistry.meter(name(getClass(), "ws_online" ));
|
||||
private final Meter websocketOfflineMeter = metricRegistry.meter(name(getClass(), "ws_offline" ));
|
||||
|
||||
private final Meter apnOnlineMeter = metricRegistry.meter(name(getClass(), "apn_online" ));
|
||||
private final Meter apnOfflineMeter = metricRegistry.meter(name(getClass(), "apn_offline"));
|
||||
|
||||
private static final ObjectMapper mapper = SystemMapper.getMapper();
|
||||
|
||||
@@ -54,21 +57,28 @@ public class WebsocketSender {
|
||||
this.pubSubManager = pubSubManager;
|
||||
}
|
||||
|
||||
public void sendMessage(Account account, Device device, PendingMessage pendingMessage) {
|
||||
public boolean sendMessage(Account account, Device device, PendingMessage pendingMessage, boolean apn) {
|
||||
try {
|
||||
String serialized = mapper.writeValueAsString(pendingMessage);
|
||||
WebsocketAddress address = new WebsocketAddress(account.getNumber(), device.getId());
|
||||
PubSubMessage pubSubMessage = new PubSubMessage(PubSubMessage.TYPE_DELIVER, serialized);
|
||||
|
||||
if (pubSubManager.publish(address, pubSubMessage)) {
|
||||
onlineMeter.mark();
|
||||
if (apn) apnOnlineMeter.mark();
|
||||
else websocketOnlineMeter.mark();
|
||||
|
||||
return true;
|
||||
} else {
|
||||
offlineMeter.mark();
|
||||
if (apn) apnOfflineMeter.mark();
|
||||
else websocketOfflineMeter.mark();
|
||||
|
||||
storedMessages.insert(address, pendingMessage);
|
||||
pubSubManager.publish(address, new PubSubMessage(PubSubMessage.TYPE_QUERY_DB, null));
|
||||
return false;
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
logger.warn("WebsocketSender", "Unable to serialize json", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,8 +22,6 @@ import org.whispersystems.textsecuregcm.auth.AuthenticationCredentials;
|
||||
import org.whispersystems.textsecuregcm.entities.SignedPreKey;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public class Device {
|
||||
|
||||
public static final long MASTER_ID = 1;
|
||||
@@ -46,6 +44,9 @@ public class Device {
|
||||
@JsonProperty
|
||||
private String apnId;
|
||||
|
||||
@JsonProperty
|
||||
private long pushTimestamp;
|
||||
|
||||
@JsonProperty
|
||||
private boolean fetchesMessages;
|
||||
|
||||
@@ -79,6 +80,10 @@ public class Device {
|
||||
|
||||
public void setApnId(String apnId) {
|
||||
this.apnId = apnId;
|
||||
|
||||
if (apnId != null) {
|
||||
this.pushTimestamp = System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
|
||||
public String getGcmId() {
|
||||
@@ -87,6 +92,10 @@ public class Device {
|
||||
|
||||
public void setGcmId(String gcmId) {
|
||||
this.gcmId = gcmId;
|
||||
|
||||
if (gcmId != null) {
|
||||
this.pushTimestamp = System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
@@ -145,4 +154,8 @@ public class Device {
|
||||
public void setSignedPreKey(SignedPreKey signedPreKey) {
|
||||
this.signedPreKey = signedPreKey;
|
||||
}
|
||||
|
||||
public long getPushTimestamp() {
|
||||
return pushTimestamp;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,65 @@
|
||||
package org.whispersystems.textsecuregcm.websocket;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||
import org.whispersystems.textsecuregcm.storage.StoredMessages;
|
||||
import org.whispersystems.websocket.session.WebSocketSessionContext;
|
||||
import org.whispersystems.websocket.setup.WebSocketConnectListener;
|
||||
|
||||
public class ConnectListener implements WebSocketConnectListener {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
|
||||
|
||||
private final AccountsManager accountsManager;
|
||||
private final PushSender pushSender;
|
||||
private final StoredMessages storedMessages;
|
||||
private final PubSubManager pubSubManager;
|
||||
|
||||
public ConnectListener(AccountsManager accountsManager, PushSender pushSender,
|
||||
StoredMessages storedMessages, PubSubManager pubSubManager)
|
||||
{
|
||||
this.accountsManager = accountsManager;
|
||||
this.pushSender = pushSender;
|
||||
this.storedMessages = storedMessages;
|
||||
this.pubSubManager = pubSubManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWebSocketConnect(WebSocketSessionContext context) {
|
||||
Optional<Account> account = context.getAuthenticated(Account.class);
|
||||
|
||||
if (!account.isPresent()) {
|
||||
logger.debug("WS Connection with no authentication...");
|
||||
context.getClient().close(4001, "Authentication failed");
|
||||
return;
|
||||
}
|
||||
|
||||
Optional<Device> device = account.get().getAuthenticatedDevice();
|
||||
|
||||
if (!device.isPresent()) {
|
||||
logger.debug("WS Connection with no authenticated device...");
|
||||
context.getClient().close(4001, "Device authentication failed");
|
||||
return;
|
||||
}
|
||||
|
||||
final WebSocketConnection connection = new WebSocketConnection(accountsManager, pushSender,
|
||||
storedMessages, pubSubManager,
|
||||
account.get(), device.get(),
|
||||
context.getClient());
|
||||
|
||||
connection.onConnected();
|
||||
|
||||
context.addListener(new WebSocketSessionContext.WebSocketEventListener() {
|
||||
@Override
|
||||
public void onWebSocketClose(WebSocketSessionContext context, int statusCode, String reason) {
|
||||
connection.onConnectionLost();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package org.whispersystems.textsecuregcm.websocket;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.websocket.auth.AuthenticationException;
|
||||
import org.whispersystems.websocket.auth.WebSocketAuthenticator;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import io.dropwizard.auth.basic.BasicCredentials;
|
||||
|
||||
|
||||
public class WebSocketAccountAuthenticator implements WebSocketAuthenticator<Account> {
|
||||
|
||||
private final AccountAuthenticator accountAuthenticator;
|
||||
|
||||
public WebSocketAccountAuthenticator(AccountAuthenticator accountAuthenticator) {
|
||||
this.accountAuthenticator = accountAuthenticator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Account> authenticate(UpgradeRequest request) throws AuthenticationException {
|
||||
try {
|
||||
Map<String, String[]> parameters = request.getParameterMap();
|
||||
String[] usernames = parameters.get("login");
|
||||
String[] passwords = parameters.get("password");
|
||||
|
||||
if (usernames == null || usernames.length == 0 ||
|
||||
passwords == null || passwords.length == 0)
|
||||
{
|
||||
return Optional.absent();
|
||||
}
|
||||
|
||||
BasicCredentials credentials = new BasicCredentials(usernames[0], passwords[0]);
|
||||
return accountAuthenticator.authenticate(credentials);
|
||||
} catch (io.dropwizard.auth.AuthenticationException e) {
|
||||
throw new AuthenticationException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,160 @@
|
||||
package org.whispersystems.textsecuregcm.websocket;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.entities.PendingMessage;
|
||||
import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubListener;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubMessage;
|
||||
import org.whispersystems.textsecuregcm.storage.StoredMessages;
|
||||
import org.whispersystems.textsecuregcm.util.SystemMapper;
|
||||
import org.whispersystems.websocket.WebSocketClient;
|
||||
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
|
||||
|
||||
public class WebSocketConnection implements PubSubListener {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(WebSocketConnection.class);
|
||||
private static final ObjectMapper objectMapper = SystemMapper.getMapper();
|
||||
|
||||
private final AccountsManager accountsManager;
|
||||
private final PushSender pushSender;
|
||||
private final StoredMessages storedMessages;
|
||||
private final PubSubManager pubSubManager;
|
||||
|
||||
private final Account account;
|
||||
private final Device device;
|
||||
private final WebsocketAddress address;
|
||||
private final WebSocketClient client;
|
||||
|
||||
public WebSocketConnection(AccountsManager accountsManager,
|
||||
PushSender pushSender,
|
||||
StoredMessages storedMessages,
|
||||
PubSubManager pubSubManager,
|
||||
Account account,
|
||||
Device device,
|
||||
WebSocketClient client)
|
||||
{
|
||||
this.accountsManager = accountsManager;
|
||||
this.pushSender = pushSender;
|
||||
this.storedMessages = storedMessages;
|
||||
this.pubSubManager = pubSubManager;
|
||||
this.account = account;
|
||||
this.device = device;
|
||||
this.client = client;
|
||||
this.address = new WebsocketAddress(account.getNumber(), device.getId());
|
||||
}
|
||||
|
||||
public void onConnected() {
|
||||
pubSubManager.subscribe(address, this);
|
||||
processStoredMessages();
|
||||
}
|
||||
|
||||
public void onConnectionLost() {
|
||||
pubSubManager.unsubscribe(address, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPubSubMessage(PubSubMessage message) {
|
||||
try {
|
||||
switch (message.getType()) {
|
||||
case PubSubMessage.TYPE_QUERY_DB:
|
||||
processStoredMessages();
|
||||
break;
|
||||
case PubSubMessage.TYPE_DELIVER:
|
||||
PendingMessage pendingMessage = objectMapper.readValue(message.getContents(),
|
||||
PendingMessage.class);
|
||||
sendMessage(pendingMessage);
|
||||
break;
|
||||
default:
|
||||
logger.warn("Unknown pubsub message: " + message.getType());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn("Error deserializing PendingMessage", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendMessage(final PendingMessage message) {
|
||||
String content = message.getEncryptedOutgoingMessage();
|
||||
Optional<byte[]> body = Optional.fromNullable(content.getBytes());
|
||||
ListenableFuture<WebSocketResponseMessage> response = client.sendRequest("PUT", "/api/v1/message", body);
|
||||
|
||||
Futures.addCallback(response, new FutureCallback<WebSocketResponseMessage>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable WebSocketResponseMessage response) {
|
||||
if (isSuccessResponse(response) && !message.isReceipt()) {
|
||||
sendDeliveryReceiptFor(message);
|
||||
} else if (!isSuccessResponse(response)) {
|
||||
requeueMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@Nonnull Throwable throwable) {
|
||||
requeueMessage(message);
|
||||
}
|
||||
|
||||
private boolean isSuccessResponse(WebSocketResponseMessage response) {
|
||||
return response != null && response.getStatus() >= 200 && response.getStatus() < 300;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void requeueMessage(PendingMessage message) {
|
||||
try {
|
||||
pushSender.sendMessage(account, device, message);
|
||||
} catch (NotPushRegisteredException | TransientPushFailureException e) {
|
||||
logger.warn("requeueMessage", e);
|
||||
storedMessages.insert(address, message);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendDeliveryReceiptFor(PendingMessage message) {
|
||||
try {
|
||||
Optional<Account> source = accountsManager.get(message.getSender());
|
||||
|
||||
if (!source.isPresent()) {
|
||||
logger.warn("Source account disappeared? (%s)", message.getSender());
|
||||
return;
|
||||
}
|
||||
|
||||
OutgoingMessageSignal.Builder receipt =
|
||||
OutgoingMessageSignal.newBuilder()
|
||||
.setSource(account.getNumber())
|
||||
.setSourceDevice((int) device.getId())
|
||||
.setTimestamp(message.getMessageId())
|
||||
.setType(OutgoingMessageSignal.Type.RECEIPT_VALUE);
|
||||
|
||||
for (Device device : source.get().getDevices()) {
|
||||
pushSender.sendMessage(source.get(), device, receipt.build());
|
||||
}
|
||||
} catch (NotPushRegisteredException | TransientPushFailureException e) {
|
||||
logger.warn("sendDeliveryReceiptFor", "Delivery receipet", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void processStoredMessages() {
|
||||
List<PendingMessage> messages = storedMessages.getMessagesForDevice(address);
|
||||
|
||||
for (PendingMessage message : messages) {
|
||||
sendMessage(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,50 +0,0 @@
|
||||
package org.whispersystems.textsecuregcm.websocket;
|
||||
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeResponse;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
|
||||
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
|
||||
import org.whispersystems.textsecuregcm.controllers.WebsocketController;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||
import org.whispersystems.textsecuregcm.storage.StoredMessages;
|
||||
|
||||
|
||||
public class WebsocketControllerFactory extends WebSocketServlet implements WebSocketCreator {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(WebsocketControllerFactory.class);
|
||||
|
||||
private final PushSender pushSender;
|
||||
private final StoredMessages storedMessages;
|
||||
private final PubSubManager pubSubManager;
|
||||
private final AccountAuthenticator accountAuthenticator;
|
||||
private final AccountsManager accounts;
|
||||
|
||||
public WebsocketControllerFactory(AccountAuthenticator accountAuthenticator,
|
||||
AccountsManager accounts,
|
||||
PushSender pushSender,
|
||||
StoredMessages storedMessages,
|
||||
PubSubManager pubSubManager)
|
||||
{
|
||||
this.accountAuthenticator = accountAuthenticator;
|
||||
this.accounts = accounts;
|
||||
this.pushSender = pushSender;
|
||||
this.storedMessages = storedMessages;
|
||||
this.pubSubManager = pubSubManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(WebSocketServletFactory factory) {
|
||||
factory.setCreator(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object createWebSocket(UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse) {
|
||||
return new WebsocketController(accountAuthenticator, accounts, pushSender, pubSubManager, storedMessages);
|
||||
}
|
||||
}
|
||||
@@ -1,18 +0,0 @@
|
||||
package org.whispersystems.textsecuregcm.websocket;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class WebsocketMessage {
|
||||
|
||||
@JsonProperty
|
||||
private long id;
|
||||
|
||||
@JsonProperty
|
||||
private String message;
|
||||
|
||||
public WebsocketMessage(long id, String message) {
|
||||
this.id = id;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -2,6 +2,8 @@ package org.whispersystems.textsecuregcm.tests.controllers;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import org.apache.commons.codec.DecoderException;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
@@ -9,6 +11,7 @@ import org.whispersystems.textsecuregcm.controllers.AccountController;
|
||||
import org.whispersystems.textsecuregcm.entities.AccountAttributes;
|
||||
import org.whispersystems.textsecuregcm.limits.RateLimiter;
|
||||
import org.whispersystems.textsecuregcm.limits.RateLimiters;
|
||||
import org.whispersystems.textsecuregcm.providers.TimeProvider;
|
||||
import org.whispersystems.textsecuregcm.sms.SmsSender;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
@@ -27,12 +30,14 @@ public class AccountControllerTest {
|
||||
|
||||
private static final String SENDER = "+14152222222";
|
||||
|
||||
private PendingAccountsManager pendingAccountsManager = mock(PendingAccountsManager.class);
|
||||
private AccountsManager accountsManager = mock(AccountsManager.class );
|
||||
private RateLimiters rateLimiters = mock(RateLimiters.class );
|
||||
private RateLimiter rateLimiter = mock(RateLimiter.class );
|
||||
private SmsSender smsSender = mock(SmsSender.class );
|
||||
private StoredMessages storedMessages = mock(StoredMessages.class );
|
||||
private PendingAccountsManager pendingAccountsManager = mock(PendingAccountsManager.class);
|
||||
private AccountsManager accountsManager = mock(AccountsManager.class );
|
||||
private RateLimiters rateLimiters = mock(RateLimiters.class );
|
||||
private RateLimiter rateLimiter = mock(RateLimiter.class );
|
||||
private SmsSender smsSender = mock(SmsSender.class );
|
||||
private StoredMessages storedMessages = mock(StoredMessages.class );
|
||||
private TimeProvider timeProvider = mock(TimeProvider.class );
|
||||
private static byte[] authorizationKey = decodeHex("3a078586eea8971155f5c1ebd73c8c923cbec1c3ed22a54722e4e88321dc749f");
|
||||
|
||||
@Rule
|
||||
public final ResourceTestRule resources = ResourceTestRule.builder()
|
||||
@@ -41,7 +46,9 @@ public class AccountControllerTest {
|
||||
accountsManager,
|
||||
rateLimiters,
|
||||
smsSender,
|
||||
storedMessages))
|
||||
storedMessages,
|
||||
timeProvider,
|
||||
Optional.of(authorizationKey)))
|
||||
.build();
|
||||
|
||||
|
||||
@@ -51,6 +58,8 @@ public class AccountControllerTest {
|
||||
when(rateLimiters.getVoiceDestinationLimiter()).thenReturn(rateLimiter);
|
||||
when(rateLimiters.getVerifyLimiter()).thenReturn(rateLimiter);
|
||||
|
||||
when(timeProvider.getCurrentTimeMillis()).thenReturn(System.currentTimeMillis());
|
||||
|
||||
when(pendingAccountsManager.getCodeForNumber(SENDER)).thenReturn(Optional.of("1234"));
|
||||
}
|
||||
|
||||
@@ -93,4 +102,84 @@ public class AccountControllerTest {
|
||||
verifyNoMoreInteractions(accountsManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVerifyToken() throws Exception {
|
||||
when(timeProvider.getCurrentTimeMillis()).thenReturn(1415917053106L);
|
||||
|
||||
String token = SENDER + ":1415906573:af4f046107c21721224a";
|
||||
|
||||
ClientResponse response =
|
||||
resources.client().resource(String.format("/v1/accounts/token/%s", token))
|
||||
.header("Authorization", AuthHelper.getAuthHeader(SENDER, "bar"))
|
||||
.entity(new AccountAttributes("keykeykeykey", false, false, 4444))
|
||||
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
assertThat(response.getStatus()).isEqualTo(204);
|
||||
|
||||
verify(accountsManager, times(1)).create(isA(Account.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVerifyBadToken() throws Exception {
|
||||
when(timeProvider.getCurrentTimeMillis()).thenReturn(1415917053106L);
|
||||
|
||||
String token = SENDER + ":1415906574:af4f046107c21721224a";
|
||||
|
||||
ClientResponse response =
|
||||
resources.client().resource(String.format("/v1/accounts/token/%s", token))
|
||||
.header("Authorization", AuthHelper.getAuthHeader(SENDER, "bar"))
|
||||
.entity(new AccountAttributes("keykeykeykey", false, false, 4444))
|
||||
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
assertThat(response.getStatus()).isEqualTo(403);
|
||||
|
||||
verifyNoMoreInteractions(accountsManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVerifyWrongToken() throws Exception {
|
||||
when(timeProvider.getCurrentTimeMillis()).thenReturn(1415917053106L);
|
||||
|
||||
String token = SENDER + ":1415906573:af4f046107c21721224a";
|
||||
|
||||
ClientResponse response =
|
||||
resources.client().resource(String.format("/v1/accounts/token/%s", token))
|
||||
.header("Authorization", AuthHelper.getAuthHeader("+14151111111", "bar"))
|
||||
.entity(new AccountAttributes("keykeykeykey", false, false, 4444))
|
||||
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
assertThat(response.getStatus()).isEqualTo(403);
|
||||
|
||||
verifyNoMoreInteractions(accountsManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVerifyExpiredToken() throws Exception {
|
||||
when(timeProvider.getCurrentTimeMillis()).thenReturn(1416003757901L);
|
||||
|
||||
String token = SENDER + ":1415906573:af4f046107c21721224a";
|
||||
|
||||
ClientResponse response =
|
||||
resources.client().resource(String.format("/v1/accounts/token/%s", token))
|
||||
.header("Authorization", AuthHelper.getAuthHeader(SENDER, "bar"))
|
||||
.entity(new AccountAttributes("keykeykeykey", false, false, 4444))
|
||||
.type(MediaType.APPLICATION_JSON_TYPE)
|
||||
.put(ClientResponse.class);
|
||||
|
||||
assertThat(response.getStatus()).isEqualTo(403);
|
||||
|
||||
verifyNoMoreInteractions(accountsManager);
|
||||
}
|
||||
|
||||
private static byte[] decodeHex(String hex) {
|
||||
try {
|
||||
return Hex.decodeHex(hex.toCharArray());
|
||||
} catch (DecoderException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,145 +0,0 @@
|
||||
package org.whispersystems.textsecuregcm.tests.controllers;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import org.eclipse.jetty.websocket.api.CloseStatus;
|
||||
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
import org.junit.Test;
|
||||
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
|
||||
import org.whispersystems.textsecuregcm.controllers.WebsocketController;
|
||||
import org.whispersystems.textsecuregcm.entities.AcknowledgeWebsocketMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.EncryptedOutgoingMessage;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.entities.PendingMessage;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||
import org.whispersystems.textsecuregcm.storage.StoredMessages;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebsocketControllerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import io.dropwizard.auth.basic.BasicCredentials;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class WebsocketControllerTest {
|
||||
|
||||
private static final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
private static final String VALID_USER = "+14152222222";
|
||||
private static final String INVALID_USER = "+14151111111";
|
||||
|
||||
private static final String VALID_PASSWORD = "secure";
|
||||
private static final String INVALID_PASSWORD = "insecure";
|
||||
|
||||
private static final StoredMessages storedMessages = mock(StoredMessages.class);
|
||||
private static final AccountAuthenticator accountAuthenticator = mock(AccountAuthenticator.class);
|
||||
private static final AccountsManager accountsManager = mock(AccountsManager.class);
|
||||
private static final PubSubManager pubSubManager = mock(PubSubManager.class );
|
||||
private static final Account account = mock(Account.class );
|
||||
private static final Device device = mock(Device.class );
|
||||
private static final UpgradeRequest upgradeRequest = mock(UpgradeRequest.class );
|
||||
private static final Session session = mock(Session.class );
|
||||
private static final PushSender pushSender = mock(PushSender.class);
|
||||
|
||||
@Test
|
||||
public void testCredentials() throws Exception {
|
||||
when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD))))
|
||||
.thenReturn(Optional.of(account));
|
||||
|
||||
when(accountAuthenticator.authenticate(eq(new BasicCredentials(INVALID_USER, INVALID_PASSWORD))))
|
||||
.thenReturn(Optional.<Account>absent());
|
||||
|
||||
when(session.getUpgradeRequest()).thenReturn(upgradeRequest);
|
||||
|
||||
WebsocketController controller = new WebsocketController(accountAuthenticator, accountsManager, pushSender, pubSubManager, storedMessages);
|
||||
|
||||
when(upgradeRequest.getParameterMap()).thenReturn(new HashMap<String, String[]>() {{
|
||||
put("login", new String[] {VALID_USER});
|
||||
put("password", new String[] {VALID_PASSWORD});
|
||||
}});
|
||||
|
||||
controller.onWebSocketConnect(session);
|
||||
|
||||
verify(session, never()).close();
|
||||
verify(session, never()).close(any(CloseStatus.class));
|
||||
verify(session, never()).close(anyInt(), anyString());
|
||||
|
||||
when(upgradeRequest.getParameterMap()).thenReturn(new HashMap<String, String[]>() {{
|
||||
put("login", new String[] {INVALID_USER});
|
||||
put("password", new String[] {INVALID_PASSWORD});
|
||||
}});
|
||||
|
||||
controller.onWebSocketConnect(session);
|
||||
|
||||
verify(session).close(any(CloseStatus.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOpen() throws Exception {
|
||||
RemoteEndpoint remote = mock(RemoteEndpoint.class);
|
||||
|
||||
List<PendingMessage> outgoingMessages = new LinkedList<PendingMessage>() {{
|
||||
add(new PendingMessage("sender1", 1111, false, "first"));
|
||||
add(new PendingMessage("sender1", 2222, false, "second"));
|
||||
add(new PendingMessage("sender2", 3333, false, "third"));
|
||||
}};
|
||||
|
||||
when(device.getId()).thenReturn(2L);
|
||||
when(account.getAuthenticatedDevice()).thenReturn(Optional.of(device));
|
||||
when(account.getNumber()).thenReturn("+14152222222");
|
||||
when(session.getRemote()).thenReturn(remote);
|
||||
when(session.getUpgradeRequest()).thenReturn(upgradeRequest);
|
||||
|
||||
final Device sender1device = mock(Device.class);
|
||||
|
||||
List<Device> sender1devices = new LinkedList<Device>() {{
|
||||
add(sender1device);
|
||||
}};
|
||||
|
||||
Account sender1 = mock(Account.class);
|
||||
when(sender1.getDevices()).thenReturn(sender1devices);
|
||||
|
||||
when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1));
|
||||
when(accountsManager.get("sender2")).thenReturn(Optional.<Account>absent());
|
||||
|
||||
when(upgradeRequest.getParameterMap()).thenReturn(new HashMap<String, String[]>() {{
|
||||
put("login", new String[] {VALID_USER});
|
||||
put("password", new String[] {VALID_PASSWORD});
|
||||
}});
|
||||
|
||||
when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD))))
|
||||
.thenReturn(Optional.of(account));
|
||||
|
||||
when(storedMessages.getMessagesForDevice(new WebsocketAddress(account.getNumber(), device.getId())))
|
||||
.thenReturn(outgoingMessages);
|
||||
|
||||
WebsocketControllerFactory factory = new WebsocketControllerFactory(accountAuthenticator, accountsManager, pushSender, storedMessages, pubSubManager);
|
||||
WebsocketController controller = (WebsocketController) factory.createWebSocket(null, null);
|
||||
|
||||
controller.onWebSocketConnect(session);
|
||||
|
||||
verify(pubSubManager).subscribe(eq(new WebsocketAddress("+14152222222", 2L)), eq((controller)));
|
||||
verify(remote, times(3)).sendStringByFuture(anyString());
|
||||
|
||||
controller.onWebSocketText(mapper.writeValueAsString(new AcknowledgeWebsocketMessage(1)));
|
||||
controller.onWebSocketClose(1000, "Closed");
|
||||
|
||||
List<PendingMessage> pending = new LinkedList<PendingMessage>() {{
|
||||
add(new PendingMessage("sender1", 1111, false, "first"));
|
||||
add(new PendingMessage("sender2", 3333, false, "third"));
|
||||
}};
|
||||
|
||||
verify(pushSender, times(2)).sendMessage(eq(account), eq(device), any(PendingMessage.class));
|
||||
verify(pushSender, times(1)).sendMessage(eq(sender1), eq(sender1device), any(MessageProtos.OutgoingMessageSignal.class));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,182 @@
|
||||
package org.whispersystems.textsecuregcm.tests.websocket;
|
||||
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.whispersystems.textsecuregcm.auth.AccountAuthenticator;
|
||||
import org.whispersystems.textsecuregcm.entities.MessageProtos;
|
||||
import org.whispersystems.textsecuregcm.entities.PendingMessage;
|
||||
import org.whispersystems.textsecuregcm.push.PushSender;
|
||||
import org.whispersystems.textsecuregcm.storage.Account;
|
||||
import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||
import org.whispersystems.textsecuregcm.storage.Device;
|
||||
import org.whispersystems.textsecuregcm.storage.PubSubManager;
|
||||
import org.whispersystems.textsecuregcm.storage.StoredMessages;
|
||||
import org.whispersystems.textsecuregcm.websocket.ConnectListener;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebSocketConnection;
|
||||
import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
||||
import org.whispersystems.websocket.WebSocketClient;
|
||||
import org.whispersystems.websocket.messages.WebSocketResponseMessage;
|
||||
import org.whispersystems.websocket.session.WebSocketSessionContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import io.dropwizard.auth.basic.BasicCredentials;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class WebSocketConnectionTest {
|
||||
|
||||
// private static final ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
private static final String VALID_USER = "+14152222222";
|
||||
private static final String INVALID_USER = "+14151111111";
|
||||
|
||||
private static final String VALID_PASSWORD = "secure";
|
||||
private static final String INVALID_PASSWORD = "insecure";
|
||||
|
||||
// private static final StoredMessages storedMessages = mock(StoredMessages.class);
|
||||
private static final AccountAuthenticator accountAuthenticator = mock(AccountAuthenticator.class);
|
||||
private static final AccountsManager accountsManager = mock(AccountsManager.class);
|
||||
private static final PubSubManager pubSubManager = mock(PubSubManager.class );
|
||||
private static final Account account = mock(Account.class );
|
||||
private static final Device device = mock(Device.class );
|
||||
private static final UpgradeRequest upgradeRequest = mock(UpgradeRequest.class );
|
||||
// private static final Session session = mock(Session.class );
|
||||
private static final PushSender pushSender = mock(PushSender.class);
|
||||
|
||||
@Test
|
||||
public void testCredentials() throws Exception {
|
||||
StoredMessages storedMessages = mock(StoredMessages.class);
|
||||
WebSocketAccountAuthenticator webSocketAuthenticator = new WebSocketAccountAuthenticator(accountAuthenticator);
|
||||
ConnectListener connectListener = new ConnectListener(accountsManager, pushSender, storedMessages, pubSubManager);
|
||||
WebSocketSessionContext sessionContext = mock(WebSocketSessionContext.class);
|
||||
|
||||
when(accountAuthenticator.authenticate(eq(new BasicCredentials(VALID_USER, VALID_PASSWORD))))
|
||||
.thenReturn(Optional.of(account));
|
||||
|
||||
when(accountAuthenticator.authenticate(eq(new BasicCredentials(INVALID_USER, INVALID_PASSWORD))))
|
||||
.thenReturn(Optional.<Account>absent());
|
||||
|
||||
when(account.getAuthenticatedDevice()).thenReturn(Optional.of(device));
|
||||
|
||||
// when(session.getUpgradeRequest()).thenReturn(upgradeRequest);
|
||||
//
|
||||
// WebsocketController controller = new WebsocketController(accountAuthenticator, accountsManager, pushSender, pubSubManager, storedMessages);
|
||||
|
||||
when(upgradeRequest.getParameterMap()).thenReturn(new HashMap<String, String[]>() {{
|
||||
put("login", new String[] {VALID_USER});
|
||||
put("password", new String[] {VALID_PASSWORD});
|
||||
}});
|
||||
|
||||
Optional<Account> account = webSocketAuthenticator.authenticate(upgradeRequest);
|
||||
when(sessionContext.getAuthenticated(Account.class)).thenReturn(account);
|
||||
|
||||
connectListener.onWebSocketConnect(sessionContext);
|
||||
|
||||
verify(sessionContext).addListener(any(WebSocketSessionContext.WebSocketEventListener.class));
|
||||
|
||||
//
|
||||
// controller.onWebSocketConnect(session);
|
||||
|
||||
// verify(session, never()).close();
|
||||
// verify(session, never()).close(any(CloseStatus.class));
|
||||
// verify(session, never()).close(anyInt(), anyString());
|
||||
|
||||
when(upgradeRequest.getParameterMap()).thenReturn(new HashMap<String, String[]>() {{
|
||||
put("login", new String[] {INVALID_USER});
|
||||
put("password", new String[] {INVALID_PASSWORD});
|
||||
}});
|
||||
|
||||
account = webSocketAuthenticator.authenticate(upgradeRequest);
|
||||
when(sessionContext.getAuthenticated(Account.class)).thenReturn(account);
|
||||
|
||||
WebSocketClient client = mock(WebSocketClient.class);
|
||||
when(sessionContext.getClient()).thenReturn(client);
|
||||
|
||||
connectListener.onWebSocketConnect(sessionContext);
|
||||
|
||||
verify(sessionContext, times(1)).addListener(any(WebSocketSessionContext.WebSocketEventListener.class));
|
||||
verify(client).close(eq(4001), anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOpen() throws Exception {
|
||||
StoredMessages storedMessages = mock(StoredMessages.class);
|
||||
|
||||
List<PendingMessage> outgoingMessages = new LinkedList<PendingMessage>() {{
|
||||
add(new PendingMessage("sender1", 1111, false, "first"));
|
||||
add(new PendingMessage("sender1", 2222, false, "second"));
|
||||
add(new PendingMessage("sender2", 3333, false, "third"));
|
||||
}};
|
||||
|
||||
when(device.getId()).thenReturn(2L);
|
||||
when(account.getAuthenticatedDevice()).thenReturn(Optional.of(device));
|
||||
when(account.getNumber()).thenReturn("+14152222222");
|
||||
|
||||
final Device sender1device = mock(Device.class);
|
||||
|
||||
List<Device> sender1devices = new LinkedList<Device>() {{
|
||||
add(sender1device);
|
||||
}};
|
||||
|
||||
Account sender1 = mock(Account.class);
|
||||
when(sender1.getDevices()).thenReturn(sender1devices);
|
||||
|
||||
when(accountsManager.get("sender1")).thenReturn(Optional.of(sender1));
|
||||
when(accountsManager.get("sender2")).thenReturn(Optional.<Account>absent());
|
||||
|
||||
when(storedMessages.getMessagesForDevice(new WebsocketAddress(account.getNumber(), device.getId())))
|
||||
.thenReturn(outgoingMessages);
|
||||
|
||||
final List<SettableFuture<WebSocketResponseMessage>> futures = new LinkedList<>();
|
||||
final WebSocketClient client = mock(WebSocketClient.class);
|
||||
|
||||
when(client.sendRequest(eq("PUT"), eq("/api/v1/message"), any(Optional.class)))
|
||||
.thenAnswer(new Answer<SettableFuture<WebSocketResponseMessage>>() {
|
||||
@Override
|
||||
public SettableFuture<WebSocketResponseMessage> answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
SettableFuture<WebSocketResponseMessage> future = SettableFuture.create();
|
||||
futures.add(future);
|
||||
return future;
|
||||
}
|
||||
});
|
||||
|
||||
WebSocketConnection connection = new WebSocketConnection(accountsManager, pushSender, storedMessages,
|
||||
pubSubManager, account, device, client);
|
||||
|
||||
connection.onConnected();
|
||||
|
||||
verify(pubSubManager).subscribe(eq(new WebsocketAddress("+14152222222", 2L)), eq((connection)));
|
||||
verify(client, times(3)).sendRequest(eq("PUT"), eq("/api/v1/message"), any(Optional.class));
|
||||
|
||||
assertTrue(futures.size() == 3);
|
||||
|
||||
WebSocketResponseMessage response = mock(WebSocketResponseMessage.class);
|
||||
when(response.getStatus()).thenReturn(200);
|
||||
futures.get(1).set(response);
|
||||
|
||||
futures.get(0).setException(new IOException());
|
||||
futures.get(2).setException(new IOException());
|
||||
|
||||
List<PendingMessage> pending = new LinkedList<PendingMessage>() {{
|
||||
add(new PendingMessage("sender1", 1111, false, "first"));
|
||||
add(new PendingMessage("sender2", 3333, false, "third"));
|
||||
}};
|
||||
|
||||
verify(pushSender, times(2)).sendMessage(eq(account), eq(device), any(PendingMessage.class));
|
||||
verify(pushSender, times(1)).sendMessage(eq(sender1), eq(sender1device), any(MessageProtos.OutgoingMessageSignal.class));
|
||||
|
||||
connection.onConnectionLost();
|
||||
verify(pubSubManager).unsubscribe(eq(new WebsocketAddress("+14152222222", 2L)), eq(connection));
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user