2017.11.27

PlatformEventに触れてみよう(Javaアプリでイベントを購読してみる)

  • このエントリーをはてなブックマークに追加
  • follow us in feedly

みなさんこんにちは。今岡です。

今年のDreamforceではmyシリーズとしていくつかの重要な発表がありましたが、今回の記事ではmyIoTを裏で支えるPlatformEventについてサンプルアプリの開発を通じて触れてみたいと思います。(myIoTではデバイスなどからのストリームデータの受け口としてPlatformEventが使われます。)

PlatformEventはSummer’17でGAになった比較的新しい機能です。これまでSalesforceから外部システムとのインテグレーションには、Outbound Message、Apex Callout、Streaming API、あるいは外部システムからSOAPやRest APIなどを使って定期的にポーリングするなどで行われてきました。各々の手段には、起動条件の柔軟性/ファイアーウォールを超えたアクセス/耐障害性/実行の効率性/負荷の分散性など、それぞれメリット・デメリットがありました。PlatformEventはMQ(Message Queueing)方式によるシステム統合が行える新たな手段として注目できます。

サンプルアプリのシナリオ

Salesforceと外部システム間で注文データを連携させるシナリオとします。Salesforceには注文カスタムオブジェクトがあり、注文カスタムオブジェクトにはステータスを持っています。このステータスが「出荷依頼」となった時、ApexトリガでメッセージをPublishします。外部システムはJavaで実装し、「出荷依頼」となったメッセージがほぼリアルタイムにSubscirbeできることを確認します。

Salesforce側のカスタマイズと開発

プラットフォームイベントにメッセージをPublishするための設定と開発には新たな作法などなく、これまでの知見で対応できます。Publishにはプロセスビルダーも使用することが出来ますが、今回はApexトリガで行います。

①注文ヘッダオブジェクトの作成

オブジェクト定義

設定項目 設定値
オブジェクト名 注文
API参照名 DemoOrder__c
レコード名 注文No.
データ型 自動採番
表示形式 {00000000}

カスタム項目

項目名 API参照名 データ型 設定値
ステータス Status__c 選択リスト 受注,出荷依頼,出荷済み,請求依頼,請求済み

②プラットフォームイベントの作成
設定 > データ > プラットフォームイベント

プラットフォームイベント定義

設定項目 設定値
オブジェクト名 注文イベント
API参照名 DemoOrderEvent__e

カスタム項目

項目名 API参照名 データ型 設定値
注文No. OrderNumber__c テキスト(255) Apexトリガで注文オブジェクトの注文No.を設定
ステータス Status__c テキスト(255) Apexトリガで注文オブジェクトのステータスを設定

③Apexトリガの作成
ApexトリガのAPIバージョンは40.0以上を指定

trigger DemoOrderTrigger on DemoOrder__c (after update) {
    List<DemoOrderEvent__e> events = new List<DemoOrderEvent__e>();
    for(DemoOrder__c od : Trigger.new){
        if(!String.isEmpty(od.Status__c) && od.Status__c == '出荷依頼'){
            DemoOrderEvent__e ev = new DemoOrderEvent__e();
            ev.OrderNumber__c = od.Name;
            ev.Status__c = od.Status__c;
            events.add(ev);
        }
    }
    if(!events.isEmpty()){
        EventBus.publish(events);
    }
}
DemoOrderTrigger.trigger

証明書の作成&接続アプリケーション設定

JavaアプリからSalesforceのプラットフォームイベントをSubscribeする上でアクセストークンが必要になります。今回のサンプルアプリでは OAuth 2.0 JWT Bearer Token Flow を使用してアクセストークンを取得することにします。

①自己署名証書の作成とエクスポート
サンプルなのでいわゆるオレオレ証明書を作成します。作成にはJavaに付属するkeytoolを使用します。

1. 鍵の生成のコマンド

#!/bin/sh
keytool -genkeypair -alias keystore -keyalg RSA -keysize 2048 -keystore keystore.jks -storetype JKS -validity 90 -keypass keypassword -storepass storepassword
genkey.sh
2. 証明書のエクスポート
#!/bin/sh
keytool -v -export -file mycert.cer -keystore keystore.jks -storepass storepassword -alias keystore
export.sh

3. 接続アプリケーションを設定
設定 > 作成 > アプリケーション > 新規接続アプリケーション

設定項目 設定値
接続アプリケーション名 OAuth JWT Sample
API参照名 OAuth_JWT_Sample
取引先責任者メール 任意のメールアドレス
OAuth設定の有効化 チェック
デバイスフローで有効化 ※1 チェック
コールバックURL ※2 https://login.salesforce.com/services/oauth2/success
選択したOAuth範囲 データへのアクセスと管理(api)、基本情報へのアクセス(id, profile, email, address, phone)

※1: チェックするとコールバックURLにデフォルト値が設定されるため便宜上チェックしています
※2: コールバックは行いませんが設定が必須のため設定します

接続アプリケーションの設定を保存し、詳細ページに表示されるコンシュマー鍵の値を控えておきます。続いてOAuthポリシーの設定を行うため、Manageボタンをクリックします。

OAuthポリシーセクションの許可されているユーザの設定値を「管理者が承認したユーザは事前承認済み」を選択します。さらに、この接続アプリケーションを使用できるプロファイルの割り当てを行います。今回は「システム管理者」を選択します。この設定によりJWTを使用してパスワードを使うことなくアクセストークンが取得できるようになります。

Javaアプリ開発(Subscriber)

PlatformEventからメッセージをSubscribeするためにはCometDなどを使って行います。また、Salesforceへの接続にはアクセストークンが必要となるため、トークンの取得にはJWTを使って行うことにします。

CometDを使った実装としては、EMP-Connectorが提供されています。実開発ではこちらをベースに開発するのが良いと思いますが、今回はより簡易な実装サンプルとして開発を進めたいと思います。

Mavenプロジェクトの作成&pom.xmlの編集

サンプルでは”consumerexample”というプロジェクト名でMavenプロジェクトを作成し、pom.xmlを以下のように記述します。依存関係として、"jackson-databind"を使いJSONの解析しJavaのオブジェクトへバインド、"cometd-java-client"を使ってCometDクライアントの実装を行います。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.gmail.jimaoka.platformevent</groupId>
  <artifactId>consumerexample</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>consumerexample</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.cometd.java/cometd-java-client -->
        <dependency>
            <groupId>org.cometd.java</groupId>
            <artifactId>cometd-java-client</artifactId>
            <version>3.1.2</version>
        </dependency>
  </dependencies>
</project>
pom.xml

クラス作成

以下のクラスを作成します。

クラス名 説明
App.java Mainクラス
OAuthService.java OAuth2 Bearer token flowで認証を行うクラス
OAuthResponse.java Salesforceからレスポンスされるアクセストークンなどを保持するクラス
PlatformEventService.java SalesforceのPlatformEventをSubscribeするクラス

App.java
Mainクラスです。1.JWT Beare Tokenを作成、 2. OAuth2.0で認証、3. PlatformEventのSubscribeを開始を行います。

package com.gmail.jimaoka.platformevent.consumerexample;

import com.gmail.jimaoka.platformevent.consumerexample.client.PlatformEventService;
import com.gmail.jimaoka.platformevent.consumerexample.oauth.OAuthResponse;
import com.gmail.jimaoka.platformevent.consumerexample.oauth.OAuthService;

public class App {
  public static void main( String[] args ){
    OAuthService oauthService = new OAuthService();
    try {
      String jwtBeareToken = oauthService.getJWTBeareToken();
      System.out.println("jwtBeareToken:" + jwtBeareToken);
      OAuthResponse oauthResponse = oauthService.getOAuthResponse(jwtBeareToken);
      System.out.println("access_token:" + oauthResponse.getAccess_token());
      PlatformEventService pevService = 
          new PlatformEventService(oauthResponse.getInstance_url(), oauthResponse.getAccess_token());
      pevService.start();
    } catch (Exception e) {
      System.out.println("exception");
      e.printStackTrace();
    }
  }
}
App.java

OAuthService.java
JWT Beare tokenの作成、 Salesforceに対して認証を行いレスポンスされるアクセストークン等をJavaオブジェクトに設定します。

package com.gmail.jimaoka.platformevent.consumerexample.oauth;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.Signature;
import java.security.SignatureException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.text.MessageFormat;
import java.util.Base64;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.util.ssl.SslContextFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

public class OAuthService {
  private static final String END_POINT = "https://login.salesforce.com/services/oauth2/token";
  private static final String AUDIENCE = "https://login.salesforce.com";
  // 接続アプリケーションのコンシュマー鍵の値を設定 
  private static final String CLIENT_ID = "3MVG9yZ.WNe6byQCanC7xsqg81Nfl068RgH1FBkktUix_dgDHv2wRVxxxxxxxxxxxxx";
  private static final String GRANT_TYPE = "urn:ietf:params:oauth:grant-type:jwt-bearer";
  
  public String getJWTBeareToken() throws KeyStoreException, NoSuchAlgorithmException, CertificateException, FileNotFoundException, IOException, UnrecoverableKeyException, InvalidKeyException, SignatureException{
    String header = "{\"alg\":\"RS256\"}";
    String claimTemplate = "'{'\"iss\": \"{0}\", \"sub\": \"{1}\", \"aud\": \"{2}\", \"exp\": \"{3}\"'}'";
      
    StringBuffer token = new StringBuffer();
    token.append(Base64.getUrlEncoder().encodeToString(header.getBytes("UTF-8")));
    token.append(".");
      
    String[] claimArray = new String[4];
    claimArray[0] = CLIENT_ID;
    // 認証を行うSalesforceユーザID
    claimArray[1] = "hogehoge@gmail.com";
    claimArray[2] = AUDIENCE;
    claimArray[3] = Long.toString((System.currentTimeMillis() / 1000) + 300);
      
    MessageFormat claims = new MessageFormat(claimTemplate);
    String payload = claims.format(claimArray);
    token.append(Base64.getUrlEncoder().encodeToString(payload.getBytes("UTF-8")));

    KeyStore keystore = KeyStore.getInstance("JKS");
    // 鍵のパス
    keystore.load(new FileInputStream("./keystore/keystore.jks"), "storepassword".toCharArray());
    PrivateKey privateKey = (PrivateKey)keystore.getKey("keystore", "keypassword".toCharArray());
      
    Signature signature = Signature.getInstance("SHA256withRSA");
    signature.initSign(privateKey);
    signature.update(token.toString().getBytes("UTF-8"));
    String signedPayload = Base64.getUrlEncoder().encodeToString(signature.sign());
      
    token.append(".");
    token.append(signedPayload);
    return token.toString();
  }
  
  public OAuthResponse getOAuthResponse(String jwtBeareToken) throws Exception {
    SslContextFactory sslContextFactory = new SslContextFactory();
    HttpClient httpClient = new HttpClient(sslContextFactory);
    httpClient.start();
    ContentResponse response = httpClient.POST(END_POINT)
        .param("grant_type", GRANT_TYPE)
        .param("assertion", jwtBeareToken)
        .send();
    httpClient.stop();
    
    ObjectMapper objectMapper = new ObjectMapper();
    OAuthResponse oauthResponse = 
        objectMapper.readValue(response.getContentAsString(), OAuthResponse.class);
    return oauthResponse;
  }

  public OAuthResponse getOAuthResponse() throws  Exception {
    return getOAuthResponse(getJWTBeareToken());
  }
}
OAuthService.java

OAuthResponse.java
Salesforceからレスポンスされるアクセストークン、インスタンスURLなどの情報を保持するためのクラスです。

package com.gmail.jimaoka.platformevent.consumerexample.oauth;

public class OAuthResponse {
  private String access_token;
  private String scope;
  private String instance_url;
  private String token_type;
  private String id;
  
  public String getAccess_token() {
    return access_token;
  }
  public void setAccess_token(String access_token) {
    this.access_token = access_token;
  }
  public String getScope() {
    return scope;
  }
  public void setScope(String scope) {
    this.scope = scope;
  }
  public String getInstance_url() {
    return instance_url;
  }
  public void setInstance_url(String instance_url) {
    this.instance_url = instance_url;
  }
  public String getToken_type() {
    return token_type;
  }
  public void setToken_type(String token_type) {
    this.token_type = token_type;
  }
  public String getId() {
    return id;
  }
  public void setId(String id) {
    this.id = id;
  }
}
OAuthResponse.java

PlatformEventService.java
CometDエンドポイントへの接続、およびチャネルをSubscribeします。

package com.gmail.jimaoka.platformevent.consumerexample.client;

import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.util.ssl.SslContextFactory;

public class PlatformEventService {
  private static final String COMETD_ENDPOINT = "/cometd/41.0";
  private static final String CHANNEL = "/event/DemoOrderEvent__e";

  private String instanceUrl;
  private String accessToken;
  private HttpClient httpClient;
  private BayeuxClient client;
  
  public PlatformEventService(String instanceUrl, String accessToken){
    this.instanceUrl = instanceUrl;
    this.accessToken = accessToken;
  }
  
  public void start() throws Exception{
    SslContextFactory sslContextFactory = new SslContextFactory();
    httpClient = new HttpClient(sslContextFactory);
    httpClient.start();
    
    LongPollingTransport httpTransport = new LongPollingTransport(null, httpClient){
      @Override
      protected void customize(Request request) {
        request.header("Authorization", "Beare " + accessToken);
      }
    };
    
    client = new BayeuxClient(instanceUrl + COMETD_ENDPOINT, httpTransport);
    client.handshake((channel, message) -> {
      if(message.isSuccessful()){
        System.out.println(message.toString());
      }
    });

    client.getChannel(CHANNEL).subscribe((channel, message) -> {
      System.out.println(message.toString());
    });
  }
  
  public void stop() throws Exception{
    if(client != null){
      client.disconnect();
    }
    if(httpClient != null){
      httpClient.stop();
    }
  }
}
PlatformEventService.java

動作確認

Javaアプリを起動し、Salesforce上で注文レコードを作成&編集します。注文レコードのステータスを"出荷依頼"とした際に、Javaを起動したコマンドラインに受診したメッセージが表示されれば成功です!

おわりに

PlatformEventを試す簡単なサンプルアプリを紹介しましたが如何でしたでしょうか? サンプルアプリの開発を通じてPlatformEventの動作を知り、Salesforceと外部システムのインテグレーションがより柔軟にできる可能性を感じてもらえたら幸いです。

40 件

関連する記事