OAuth Consumer Request の処理フローと実装

OpenSocial の RESTful API では、OAuth を利用して権限の確認を行います。ただし、コンシューマが完全にユーザに成り代わって処理をするため、コンシューマとプロバイダ二者間の信任フロー(2-legged OAuth)になります。この方法については、まだドラフト段階ですが次の文書に記されています。

» OAuth Consumer Request 1.0 Draft 1

処理フローを以下に説明します。コンシューマは次のパラメータをプロバイダに送信することになります。

oauth_consumer_keyコンシューマキー
oauth_signature_methodHMAC-SHA1
oauth_signatureシグニチャ
oauth_timestampUNIXタイムスタンプ
oauth_nonceランダムな文字列
oauth_version1.0 (オプション)

ここで、 oauth_signature は以下のようにして生成されます。まず、ベース文字列を生成するために次の値を用意します。

  1. GET
  2. http://provider.example.net/profile
  3. oauth_consumer_key=dpf43f3p2l4k3l03&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1191242096&oauth_version=1.0

これらをURIエスケープした後に & で連結して、ベース文字列を生成します。

GET&http%3A%2F%2Fprovider.example.net%2Fprofile&oauth_consumer_key%3D
dpf43f3p2l4k3l03%26oauth_nonce%3Dkllo9940pd9333jh%26oauth_signature_method
%3DHMAC-SHA1%26oauth_timestamp%3D1191242096%26oauth_version%3D1.0

そして、このベース文字列を HMAC-SHA1 によってダイジェスト値を生成し、BASE64 でエンコードすることによってシグニチャが生成されます。この際、利用する共有キーは cosumer_secret と 空のToken Secret を & で連結したものになります。例えば、 consumer_secret が kd94hf93k423kf44 なら kd94hf93k423kf44& になります(Token Secretは空のため)。こうして、ダイジェスト値は以下のようになります。

SGtGiOrgTGF5Dd4RUMguopweOSU=

こうして生成されたパラメータをリクエスト時の Authorization ヘッダに付加します。プロバイダ側でも同様にダイジェスト値を生成し、シグニチャが合っていればプロバイダが持つリソースへのアクセスを許可します。また、タイムスタンプが5分以上前の場合にはエラーとすることで、リクエストURLが漏れた場合でもセキュリティを確保しています。

Authorization: OAuth realm="http://provider.example.net/",
               oauth_consumer_key="dpf43f3p2l4k3l03",
               oauth_signature_method="HMAC-SHA1",
               oauth_signature="SGtGiOrgTGF5Dd4RUMguopweOSU%3D",
               oauth_timestamp="1191242096",
               oauth_nonce="kllo9940pd9333jh",
               oauth_version="1.0"

さて、以上の処理を Django で実装してみました。コンシューマ側は、普通のPythonスクリプトですので、コンソールから実行することができます。

プロバイダ側

import oauth
from django.shortcuts import *

class MockOAuthDataStore(oauth.OAuthDataStore):
    def __init__(self):
        self.consumer = oauth.OAuthConsumer('key', 'secret')
        self.nonce = 'nonce'

    def lookup_consumer(self, key):
        if key == self.consumer.key:
            return self.consumer
        return None

    def lookup_nonce(self, oauth_consumer, oauth_token, nonce):
        return None

def auth_test(req):
    os = oauth.OAuthServer(MockOAuthDataStore())
    os.add_signature_method(oauth.OAuthSignatureMethod_HMAC_SHA1())

    # build from request
    base_url = req.is_secure() and 'https://' or 'http://' + req.get_host()
    try:
        os.oauth_request = oauth.OAuthRequest.from_request(
            req.method,
            base_url + req.path,
            headers={'Authorization': req.META.get('HTTP_AUTHORIZATION')},
        )

        consumer, token, params = os.verify_request(os.oauth_request)
    except oauth.OAuthError, err:
        return HttpResponse(err.message, status=401)

    return HttpResponse('OK!')

コンシューマ側

import oauth
import urllib2

CONSUMER_KEY = 'key'
CONSUMER_SECRET = 'secret'

def oauth_request(method, url, parameters=None):
    consumer = oauth.OAuthConsumer(CONSUMER_KEY, CONSUMER_SECRET)
    signature_method_hmac_sha1 = oauth.OAuthSignatureMethod_HMAC_SHA1()

    # access protected resources
    oauth_request = oauth.OAuthRequest.from_consumer_and_token(
	                consumer,
			token=None,
			http_method=method,

			http_url=url,
			parameters=parameters)
    oauth_request.sign_request(signature_method_hmac_sha1, consumer, '')

    headers = oauth_request.to_header()
    
    r = urllib2.Request(url, headers=headers)

    try:
        print urllib2.urlopen(r).read()
    except urllib2.HTTPError, e:
        print e

if __name__ == '__main__':
  oauth_request('GET', 'http://localhost:8080/auth_test/')

Leah Culverさんが書いたOAuthライブラリを利用しているのですが、空の Token だとエラーになるので、一箇所だけソースに手を加えています。

--- oauth.py.org   2008-06-14 11:52:33.000000000 +0900
+++ oauth.py       2008-06-14 12:01:44.000000000 +0900
@@ -311,7 +311,10 @@
         version = self._get_version(oauth_request)
         consumer = self._get_consumer(oauth_request)
         # get the access token
-        token = self._get_token(oauth_request, 'access')
+        try:
+            token = self._get_token(oauth_request, 'access')
+        except:
+            token = ''
         self._check_signature(oauth_request, consumer, token)
         parameters = oauth_request.get_nonoauth_parameters()
         return consumer, token, parameters


このエントリーのはてなブックマーク (-)