みなさんこんにちは。今岡です。
今年のDreamforceではmyシリーズとしていくつかの重要な発表がありましたが、今回の記事ではmyIoTを裏で支えるPlatformEventについてサンプルアプリの開発を通じて触れてみたいと思います。(myIoTではデバイスなどからのストリームデータの受け口としてPlatformEventが使われます。)
PlatformEventはSummer’17でGAになった比較的新しい機能です。これまでSalesforceから外部システムとのインテグレーションには、Outbound Message、Apex Callout、Streaming API、あるいは外部システムからSOAPやRest APIなどを使って定期的にポーリングするなどで行われてきました。各々の手段には、起動条件の柔軟性/ファイアーウォールを超えたアクセス/耐障害性/実行の効率性/負荷の分散性など、それぞれメリット・デメリットがありました。PlatformEventはMQ(Message Queueing)方式によるシステム統合が行える新たな手段として注目できます。
サンプルアプリのシナリオ
Salesforceと外部システム間で注文データを連携させるシナリオとします。Salesforceには注文カスタムオブジェクトがあり、注文カスタムオブジェクトにはステータスを持っています。このステータスが「出荷依頼」となった時、ApexトリガでメッセージをPublishします。外部システムはJavaで実装し、「出荷依頼」となったメッセージがほぼリアルタイムにSubscirbeできることを確認します。
Salesforce側のカスタマイズと開発
プラットフォームイベントにメッセージをPublishするための設定と開発には新たな作法などなく、これまでの知見で対応できます。Publishにはプロセスビルダーも使用することが出来ますが、今回はApexトリガで行います。
①注文ヘッダオブジェクトの作成
オブジェクト定義
設定項目 | 設定値 |
---|---|
オブジェクト名 | 注文 |
API参照名 | DemoOrder__c |
レコード名 | 注文No. |
データ型 | 自動採番 |
表示形式 | {00000000} |
カスタム項目
項目名 | API参照名 | データ型 | 設定値 |
---|---|---|---|
ステータス | Status__c | 選択リスト | 受注,出荷依頼,出荷済み,請求依頼,請求済み |
②プラットフォームイベントの作成
設定 > データ > プラットフォームイベント
プラットフォームイベント定義
設定項目 | 設定値 |
---|---|
オブジェクト名 | 注文イベント |
API参照名 | DemoOrderEvent__e |
カスタム項目
項目名 | API参照名 | データ型 | 設定値 |
---|---|---|---|
注文No. | OrderNumber__c | テキスト(255) | Apexトリガで注文オブジェクトの注文No.を設定 |
ステータス | Status__c | テキスト(255) | Apexトリガで注文オブジェクトのステータスを設定 |
③Apexトリガの作成
ApexトリガのAPIバージョンは40.0以上を指定
trigger DemoOrderTrigger on DemoOrder__c (after update) { List<DemoOrderEvent__e> events = new List<DemoOrderEvent__e>(); for(DemoOrder__c od : Trigger.new){ if(!String.isEmpty(od.Status__c) && od.Status__c == '出荷依頼'){ DemoOrderEvent__e ev = new DemoOrderEvent__e(); ev.OrderNumber__c = od.Name; ev.Status__c = od.Status__c; events.add(ev); } } if(!events.isEmpty()){ EventBus.publish(events); } }
証明書の作成&接続アプリケーション設定
JavaアプリからSalesforceのプラットフォームイベントをSubscribeする上でアクセストークンが必要になります。今回のサンプルアプリでは OAuth 2.0 JWT Bearer Token Flow を使用してアクセストークンを取得することにします。
①自己署名証書の作成とエクスポート
サンプルなのでいわゆるオレオレ証明書を作成します。作成にはJavaに付属するkeytoolを使用します。
1. 鍵の生成のコマンド
#!/bin/sh keytool -genkeypair -alias keystore -keyalg RSA -keysize 2048 -keystore keystore.jks -storetype JKS -validity 90 -keypass keypassword -storepass storepassword
#!/bin/sh keytool -v -export -file mycert.cer -keystore keystore.jks -storepass storepassword -alias keystore
3. 接続アプリケーションを設定
設定 > 作成 > アプリケーション > 新規接続アプリケーション
設定項目 | 設定値 |
---|---|
接続アプリケーション名 | OAuth JWT Sample |
API参照名 | OAuth_JWT_Sample |
取引先責任者メール | 任意のメールアドレス |
OAuth設定の有効化 | チェック |
デバイスフローで有効化 ※1 | チェック |
コールバックURL ※2 | https://login.salesforce.com/services/oauth2/success |
デジタル署名を使用 | チェック |
ファイルを選択 | 手順2でエクスポートした証明書ファイルを選択(ex. mycert.cer) |
選択したOAuth範囲 | データへのアクセスと管理(api)、基本情報へのアクセス(id, profile, email, address, phone) |
※1: チェックするとコールバックURLにデフォルト値が設定されるため便宜上チェックしています
※2: コールバックは行いませんが設定が必須のため設定します
Javaアプリ開発(Subscriber)
PlatformEventからメッセージをSubscribeするためにはCometDなどを使って行います。また、Salesforceへの接続にはアクセストークンが必要となるため、トークンの取得にはJWTを使って行うことにします。
CometDを使った実装としては、EMP-Connectorが提供されています。実開発ではこちらをベースに開発するのが良いと思いますが、今回はより簡易な実装サンプルとして開発を進めたいと思います。
Mavenプロジェクトの作成&pom.xmlの編集
サンプルでは”consumerexample”というプロジェクト名でMavenプロジェクトを作成し、pom.xmlを以下のように記述します。依存関係として、"jackson-databind"を使いJSONの解析しJavaのオブジェクトへバインド、"cometd-java-client"を使ってCometDクライアントの実装を行います。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.gmail.jimaoka.platformevent</groupId> <artifactId>consumerexample</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>consumerexample</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.cometd.java/cometd-java-client --> <dependency> <groupId>org.cometd.java</groupId> <artifactId>cometd-java-client</artifactId> <version>3.1.2</version> </dependency> </dependencies> </project>
クラス作成
以下のクラスを作成します。
クラス名 | 説明 |
---|---|
App.java | Mainクラス |
OAuthService.java | OAuth2 Bearer token flowで認証を行うクラス |
OAuthResponse.java | Salesforceからレスポンスされるアクセストークンなどを保持するクラス |
PlatformEventService.java | SalesforceのPlatformEventをSubscribeするクラス |
App.java
Mainクラスです。1.JWT Beare Tokenを作成、 2. OAuth2.0で認証、3. PlatformEventのSubscribeを開始を行います。
package com.gmail.jimaoka.platformevent.consumerexample; import com.gmail.jimaoka.platformevent.consumerexample.client.PlatformEventService; import com.gmail.jimaoka.platformevent.consumerexample.oauth.OAuthResponse; import com.gmail.jimaoka.platformevent.consumerexample.oauth.OAuthService; public class App { public static void main( String[] args ){ OAuthService oauthService = new OAuthService(); try { String jwtBeareToken = oauthService.getJWTBeareToken(); System.out.println("jwtBeareToken:" + jwtBeareToken); OAuthResponse oauthResponse = oauthService.getOAuthResponse(jwtBeareToken); System.out.println("access_token:" + oauthResponse.getAccess_token()); PlatformEventService pevService = new PlatformEventService(oauthResponse.getInstance_url(), oauthResponse.getAccess_token()); pevService.start(); } catch (Exception e) { System.out.println("exception"); e.printStackTrace(); } } }
OAuthService.java
JWT Beare tokenの作成、 Salesforceに対して認証を行いレスポンスされるアクセストークン等をJavaオブジェクトに設定します。
package com.gmail.jimaoka.platformevent.consumerexample.oauth; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.security.InvalidKeyException; import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.PrivateKey; import java.security.Signature; import java.security.SignatureException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; import java.text.MessageFormat; import java.util.Base64; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.util.ssl.SslContextFactory; import com.fasterxml.jackson.databind.ObjectMapper; public class OAuthService { private static final String END_POINT = "https://login.salesforce.com/services/oauth2/token"; private static final String AUDIENCE = "https://login.salesforce.com"; // 接続アプリケーションのコンシュマー鍵の値を設定 private static final String CLIENT_ID = "3MVG9yZ.WNe6byQCanC7xsqg81Nfl068RgH1FBkktUix_dgDHv2wRVxxxxxxxxxxxxx"; private static final String GRANT_TYPE = "urn:ietf:params:oauth:grant-type:jwt-bearer"; public String getJWTBeareToken() throws KeyStoreException, NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException, UnrecoverableKeyException, InvalidKeyException, SignatureException{ String header = "{\"alg\":\"RS256\"}"; String claimTemplate = "'{'\"iss\": \"{0}\", \"sub\": \"{1}\", \"aud\": \"{2}\", \"exp\": \"{3}\"'}'"; StringBuffer token = new StringBuffer(); token.append(Base64.getUrlEncoder().encodeToString(header.getBytes("UTF-8"))); token.append("."); String[] claimArray = new String[4]; claimArray[0] = CLIENT_ID; // 認証を行うSalesforceユーザID claimArray[1] = "hogehoge@gmail.com"; claimArray[2] = AUDIENCE; claimArray[3] = Long.toString((System.currentTimeMillis() / 1000) + 300); MessageFormat claims = new MessageFormat(claimTemplate); String payload = claims.format(claimArray); token.append(Base64.getUrlEncoder().encodeToString(payload.getBytes("UTF-8"))); KeyStore keystore = KeyStore.getInstance("JKS"); // 鍵のパス keystore.load(new FileInputStream("./keystore/keystore.jks"), "storepassword".toCharArray()); PrivateKey privateKey = (PrivateKey)keystore.getKey("keystore", "keypassword".toCharArray()); Signature signature = Signature.getInstance("SHA256withRSA"); signature.initSign(privateKey); signature.update(token.toString().getBytes("UTF-8")); String signedPayload = Base64.getUrlEncoder().encodeToString(signature.sign()); token.append("."); token.append(signedPayload); return token.toString(); } public OAuthResponse getOAuthResponse(String jwtBeareToken) throws Exception { SslContextFactory sslContextFactory = new SslContextFactory(); HttpClient httpClient = new HttpClient(sslContextFactory); httpClient.start(); ContentResponse response = httpClient.POST(END_POINT) .param("grant_type", GRANT_TYPE) .param("assertion", jwtBeareToken) .send(); httpClient.stop(); ObjectMapper objectMapper = new ObjectMapper(); OAuthResponse oauthResponse = objectMapper.readValue(response.getContentAsString(), OAuthResponse.class); return oauthResponse; } public OAuthResponse getOAuthResponse() throws Exception { return getOAuthResponse(getJWTBeareToken()); } }
OAuthResponse.java
Salesforceからレスポンスされるアクセストークン、インスタンスURLなどの情報を保持するためのクラスです。
package com.gmail.jimaoka.platformevent.consumerexample.oauth; public class OAuthResponse { private String access_token; private String scope; private String instance_url; private String token_type; private String id; public String getAccess_token() { return access_token; } public void setAccess_token(String access_token) { this.access_token = access_token; } public String getScope() { return scope; } public void setScope(String scope) { this.scope = scope; } public String getInstance_url() { return instance_url; } public void setInstance_url(String instance_url) { this.instance_url = instance_url; } public String getToken_type() { return token_type; } public void setToken_type(String token_type) { this.token_type = token_type; } public String getId() { return id; } public void setId(String id) { this.id = id; } }
PlatformEventService.java
CometDエンドポイントへの接続、およびチャネルをSubscribeします。
package com.gmail.jimaoka.platformevent.consumerexample.client; import org.cometd.client.BayeuxClient; import org.cometd.client.transport.LongPollingTransport; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.util.ssl.SslContextFactory; public class PlatformEventService { private static final String COMETD_ENDPOINT = "/cometd/41.0"; private static final String CHANNEL = "/event/DemoOrderEvent__e"; private String instanceUrl; private String accessToken; private HttpClient httpClient; private BayeuxClient client; public PlatformEventService(String instanceUrl, String accessToken){ this.instanceUrl = instanceUrl; this.accessToken = accessToken; } public void start() throws Exception{ SslContextFactory sslContextFactory = new SslContextFactory(); httpClient = new HttpClient(sslContextFactory); httpClient.start(); LongPollingTransport httpTransport = new LongPollingTransport(null, httpClient){ @Override protected void customize(Request request) { request.header("Authorization", "Beare " + accessToken); } }; client = new BayeuxClient(instanceUrl + COMETD_ENDPOINT, httpTransport); client.handshake((channel, message) -> { if(message.isSuccessful()){ System.out.println(message.toString()); } }); client.getChannel(CHANNEL).subscribe((channel, message) -> { System.out.println(message.toString()); }); } public void stop() throws Exception{ if(client != null){ client.disconnect(); } if(httpClient != null){ httpClient.stop(); } } }
動作確認
Javaアプリを起動し、Salesforce上で注文レコードを作成&編集します。注文レコードのステータスを"出荷依頼"とした際に、Javaを起動したコマンドラインに受診したメッセージが表示されれば成功です!
おわりに
PlatformEventを試す簡単なサンプルアプリを紹介しましたが如何でしたでしょうか? サンプルアプリの開発を通じてPlatformEventの動作を知り、Salesforceと外部システムのインテグレーションがより柔軟にできる可能性を感じてもらえたら幸いです。