HTTPの持続的接続を扱う

持続的接続を扱うと面倒になるという話の続き。
別のサーバにリダイレクトしたときリダイレクト前のソケットは開いたままにするのかというのと、プロキシに関する部分があまり判っていないけど、それ以外の動作は何となく判ってきた。
ただ、持続的接続を扱う関数を作ろうとすると、どういう形で機能を提供すれば良いのかいまいち判らない。APIデザインが本題ではなかったけど、良くないデザインのAPIが生まれるのもしょうがないというか苦労と妥協の結果の場合も多いのだろうと思った(だからといってひどいAPIが正当化されるわけではないけど)。
あとテストを作りながら作業をやっていくつもりだったのに、テストの方は全然作っていない。
とりあえずは次のように扱うことにした。

;; 1度目のリクエスト
(receive (resp-preface1 socket1)
    (http-send-request "http://d.hatena.ne.jp/" :method 'GET
                       :headers (kvs->headers :connection "Keep-Alive"))
  (call-with-http-input-port resp-preface1 socket1
    (lambda (http-in) 
      (copy-port http-in (current-output-port)))
    :socket-close? #f)  ;; デフォルトでは処理が終わるとソケットを閉じる
  ;; 2度目のリクエスト
  (receive (resp-preface2 socket2)
      (http-send-request "http://d.hatena.ne.jp/keyword/"
                         :method 'GET :socket socket1)
    (call-with-http-input-port resp-preface2 socket2
      (lambda (http-in)
        (print (read-line http-in))))
    (print "socket1=socket2? "
           (equal? socket1 socket2)))) ;; socket1とsocket2は同一

http-send-requestはリクエストを送信して、ステータスコードとヘッダまで読む。送信ヘッダに付け加える内容は直接キーワード引数にするのではなくheadersキーワードをはさむことにした。関数への指示のつもりで書いたキーワード引数が綴り間違いで送信ヘッダに渡っていることが非常に多かったので。
戻り値として本当はステータスコードとヘッダをそのまま返したかったのだけど、リダイレクトしている場合リダイレクト前の情報も欲しいので、そういう情報も全部まとめて最初の戻り値で返す。二つ目の戻り値はソケットで本体を読む直前の状態になっている。
ヘッダまで読む処理とそれ以後の処理を分離するのはあまり望ましくないかもしれないけど、良いやり方が判らなかった。持続的接続で使いまわしたいソケットを表に出すのか(出すとしたらどのように出すか)何か抽象オブジェクトを作るのかとか、同じソケットを使う処理は一つの関数呼び出しで処理するのかとか。
レスポンスの本体はcall-with-http-input-portを使って読む。http-input-portは、本体の長さの示し方がContent-Lengthによるものなのかチャンク形式なのかといったことと関係なくEOFまで読めば良いようになっている。
なるべくGaucherfc.httpモジュールの動作に近づけようとしたけど、はっきりと違う動作をしているところは、POSTがリダイレクトされたらGETに変えること(RFC違反だけど)、multipart/form-data形式のデータのヘッダの値には必ず二重引用符を付けることなど。

Content-Disposition: form-data; name="user"

RFCによれば特殊な文字が入っていない場合は二重引用符は無くてもかまわないはず(ただしRFC 2388の例は二重引用符を付けている)だけど、二重引用符が付いていないと解析できないライブラリが結構ある(PerlCGIモジュール、RubyCGIクラスやwebrick/cgi.rbなど。Pythoncgiモジュールは大丈夫みたい)。なので必ず二重引用符を付けることにした。
Gaucherfc.httpのhttp-getやhttp-postは次のように定義できる。エンコード関連のオプションは扱えてないけど、Gauche 0.9のtest/rfc.scmのrfc.httpの部分は通る(キーワードオプションが追加されているので0.9.1のテストは全然通らない)。

(use rfc.http :only (http-compose-query))
(define (http-get server request-uri . opts)
  (apply gauche-http-generic 'GET server request-uri #f opts))

(define (http-head server request-uri . opts)
  (apply gauche-http-generic 'HEAD server request-uri #f opts))

(define (http-post server request-uri body . opts)
  (apply gauche-http-generic 'POST server request-uri body opts))

(define (gauche-http-generic
          method server request-uri body :key (no-redirect #f) (proxy #f)
          (sink (open-output-string))
          (flusher (lambda (sink _) (get-output-string sink)))
          :allow-other-keys kvs)

  (let* ((request-uri (cond
                        ((string? request-uri) request-uri)
                        ((list request-uri)
                         (http-compose-query (car request-uri)
                                             (cdr request-uri)))))
         (abs-uri #`"http://,|server|,|request-uri|"))
    (receive (resp-preface socket) (http-send-request
                                     abs-uri :method method :body body
                                     :proxy proxy :no-redirect no-redirect
                                     :headers (apply kvs->headers kvs))
      (let* ((headers (ref resp-preface 'headers))
             (body (call-with-http-input-port
                     resp-preface socket
                     (lambda (http-in)
                       (copy-port http-in sink)
                       (flusher sink headers)))))
        (values (ref resp-preface 'status)
                headers
                (and (not (eq? method 'HEAD)) body))))))

ソース

(define-module low-http
  (use gauche.net)
  (use gauche.vport)
  (use gauche.parameter)
  (use gauche.uvector)
  (use rfc.822)
  (use rfc.uri)
  (use rfc.mime)
  (use file.util)
  (use util.match)
  (export
    http-uri-join http-uri->socket kvs->headers
    <http-response-preface>
    http-send-request open-http-input-port call-with-http-input-port
    ;; rfc.http互換
    http-get http-post http-head http-user-agent
    ))

(select-module low-http)

(define http-user-agent
  (make-parameter #`"gauche.http/,(gauche-version)"))

;; 追記: ドットセグメント("."や"..")の消去(RFC 3986 sec.5.2.4)をきちんとやっていない。
;; 特にqueryにドットが入っていた場合、明確に間違った処理をしている。
(define (http-uri-join base-uri target-uri)
  (define (uri-has-scheme? uri)
    (receive (scheme _) (uri-scheme&specific uri)
      scheme))

  (receive (b-scheme b-info b-host b-port b-path b-query b-fragment)
    (uri-parse base-uri)

    (cond
      ((string=? target-uri "") base-uri)
      ((uri-has-scheme? target-uri) target-uri)
      ((#/^#/ target-uri)
       => (lambda (rxm)
            (uri-compose :scheme b-scheme
                         :userinfo b-info :host b-host :port b-port
                         :path b-path :query b-query :fragment (rxm 'after))))
      ((#/^\/\// target-uri)
       (uri-compose :scheme b-scheme :specific target-uri))
      ((#/^\// target-uri)
       (uri-compose :scheme b-scheme :userinfo b-info :host b-host
                    :port b-port :path* target-uri))
      ((#/^\?/ target-uri)
       => (lambda (rxm)
            (uri-compose :scheme b-scheme :userinfo b-info :host b-host
                         :port b-port :path b-path :query (rxm `after))))
      (else
        (receive (b-dir _ __) (decompose-path (or b-path "/"))
          (let1 new-uri (uri-compose
                          :scheme b-scheme :userinfo b-info
                          :host b-host :port b-port
                          :path (simplify-path (build-path b-dir target-uri)))

            new-uri))))))

(define (kvs-reverse xs)
  (let loop ((xs xs)
             (ys '()))
    (if (null? xs)
      ys 
      (loop (cddr xs) `(,(car xs) ,(cadr xs) ,@ys)))))

(define (kvs->headers . opts)
  (let loop ((opts opts) (headers '()))
    (if (or (null? opts) (null? (cdr opts)))
      (reverse headers)
      (loop (cddr opts)
            (cons (list (x->string (car opts)) (x->string (cadr opts)))
                  headers)))))

(define (http-write-request oport uri :key (method 'GET) (headers '())
                             (body #f) (proxy #f))
  (receive (scheme _ host-name port-num path query __) (uri-parse uri)
    (let ((host (if port-num #`",|host-name|:,|port-num|" host-name))
          (request-uri (if proxy
                         uri
                         (call-with-output-string
                           (lambda (out)
                             (format out "~a" (or path "/"))
                             (when query (format out "?~a" query)))))))
      (format oport "~a ~a HTTP/1.1\r\n" method request-uri)
      (http-write-headers oport
                          (kvs->headers :Host host
                                        :User-Agent (http-user-agent))
                          :continue? #t)))
  ;; ToDo: proxy指定時はConnectionヘッダをProxy-Connectionヘッダに変える(のか?)
  (http-write-headers oport headers :continue? #t)
  (if (or (eq? method 'POST) (eq? method 'PUT))
    (begin
      (unless body
        (errorf "http-write-request: method ~a require :body argument"
                method))
      (http-write-body oport body))
    (display "\r\n" oport)))

(define (http-write-headers oport headers :key (continue? #f))
  (dolist (header headers)
    (format oport "~a: ~a\r\n" (car header) (cadr header)))
  (unless continue?
    (format oport "\r\n")))

(define (http-write-body oport body)
  (cond
    ((string? body)
     (http-write-string-body oport body))
    ((and (list? body) (string? (car body)))
     (http-write-file-body oport (car body)))
    ((list? body)
     (http-write-multipart-body oport body))
    (else (error "http-write-body: bad body format"))))

(define (http-write-string-body oport str)
  (define size (string-length str))
  (format oport "Content-Length: ~d\r\n" size)
  (format oport "\r\n")
  (format oport "~a" str)
  (flush oport))

(define (http-write-file-body oport file)
  (define size (file-size file))
  (format oport "Content-Length: ~d\r\n" size)
  (format oport "\r\n")
  (call-with-input-file
    file
    (lambda (iport) (copy-port iport oport)))
  (flush oport))

;;; multipart用
;;; イコールの右の値を常に二重引用符で囲む。name="val"のように
;;; 正しくパーズできないライブラリが結構あるので
(define (mime-compose-param-q params)
  (apply string-append
         (map (lambda (x)
                (regexp-replace
                  #/=([^"]*)$/
                  (mime-compose-parameters (list x) #f)
                  "=\"\\1\""))
              params)))

(define (body-param->body-part param)
  (match param
    ((name value) (body-param->body-part `(,name :value ,value)))
    ((name . opts)
     (let-keywords opts ((value #f)
                         (file #f)
                         . kvs)
       (let1 kvs (kvs-reverse kvs)
         (cond
           (file (apply make-file-part name file kvs))
           (value (apply make-value-part name value kvs))))))))

(define (make-value-part name value . kvs)
  (call-with-output-string
    (lambda (str-out)
      (http-write-headers
        str-out
        (apply kvs->headers 
               :Content-Disposition
               (string-append "form-data"
                              (mime-compose-param-q (list (cons "name" name))))
               kvs))
      (display value str-out))))

(define (make-file-part name file . kvs)
  (let1 header (call-with-output-string
                 (lambda (str-out)
                   (http-write-headers
                     str-out
                     (apply kvs->headers
                            :Content-Disposition
                            (string-append
                              "form-data"
                              (mime-compose-param-q (list (cons "name" name)))
                              (mime-compose-param-q (list (cons "filename" file))))
                            kvs))))
    (list header (x->string file))))

(define (body-params->body-parts params)
  (map body-param->body-part params))

(define (body-parts-size parts boundary)
  (define size-boundary-line (+ 6 (string-size boundary)))
  (define size-last-boundary-line (+ 8 (string-size boundary)))
  (let loop ((parts parts) (total 0))
    (if (null? parts)
      (+ total size-last-boundary-line)
      (let1 part (car parts)
        (if (string? part)
          (loop (cdr parts) (+ total size-boundary-line (string-size part)))
          (loop (cdr parts)
                (+ total size-boundary-line
                   (string-size (car part)) (file-size (cadr part)))))))))

(define (write-multipart-params oport params boundary)
  (write-multipart-parts
    oport
    (body-params->body-parts params)
    boundary))

(define (write-multipart-parts oport parts boundary)
  (define boundary-line #`"\r\n--,|boundary|\r\n")
  (define last-boundary-line #`"\r\n--,|boundary|--\r\n")
  (with-output-to-port
    oport
    (lambda ()
      (dolist (part parts)
        (display boundary-line)
        (if (string? part)
          (display part)
          (begin
            (display (car part))
            (call-with-input-file
              (cadr part)
              (lambda (file-iport)
                (copy-port file-iport oport))))))
      (display last-boundary-line))))

(define (http-write-multipart-body oport params)
  (define boundary (mime-make-boundary))
  (define parts (body-params->body-parts params))
  (define content-type-value #`"multipart/form-data; boundary=,|boundary|")
  (define content-length-value (x->string (body-parts-size parts boundary)))
  (http-write-headers
    oport
    `(("Content-Type"  ,content-type-value)
      ("Content-Length" ,content-length-value)))
  (write-multipart-parts oport parts boundary))

(define (http-read-status-line iport)
  (let1 line (read-line iport)
    (cond
      ((eof-object? line) (error "response no data"))
      ((#/\w+\s+(\d\d\d)\s/ line) => (lambda (rxm) (rxm 1)))
      (else (error "http status line bad format" line)))))

(define (open-http-input-port socket response-preface)
  (define (no-body-status? status)
    (or (#/^1..$/ status) (#/^204$/ status) (#/^304$/ status)))
  (define (chunked-body? headers)
    (and-let* ((pair (assoc "transfer-encoding" headers)))
      (#/chunked/ (cadr pair))))
  (define (get-content-length headers)
    (and-let* ((pair (assoc "content-length" headers)))
      (string->number (cadr pair))))

  (let ((iport (socket-input-port socket))
        (status (ref response-preface 'status))
        (headers (ref response-preface 'headers))
        (method (ref response-preface 'used-method)))
    (cond
      ((or (equal? method 'HEAD) (no-body-status? status))
       (input-port-for-limited-length iport 0))
      ((chunked-body? headers)
       (open-input-port-for-decoding-chunked iport))
      ((get-content-length headers)
       => (lambda (len)
            (input-port-for-limited-length iport len)))
      (else iport))))

(define (input-port-for-limited-length iport len)
  (define param-rest-len (make-parameter len))
  (make <buffered-input-port>
        :fill
        (lambda (u8vec)
          (if (= (param-rest-len) 0)
            (eof-object)
            (read-body! u8vec iport param-rest-len)))))

(define (open-input-port-for-decoding-chunked iport)
  (define param-rest-len (make-parameter 'start))
  (define (read-chunk-size iport)
    (let1 line (read-line iport)
      (when (eof-object? line)
        (error "chunked body bad format" line))
      (cond
        ((#/^([[:xdigit:]]+)/ line)
         => (lambda (rxm) (string->number (rxm 1) 16)))
        (error "chunked body bad format" line))))

  (make <buffered-input-port>
        :fill
        (lambda (u8vec)
          (if (eof-object? (param-rest-len))
            (eof-object)
            (begin
              (cond
                ((eq? (param-rest-len) 'start)
                 (param-rest-len (read-chunk-size iport)))
                ((= (param-rest-len) 0)
                 (read-line iport) ; skip CRLF
                 (param-rest-len (read-chunk-size iport)))
                (else 'nop))
              (if (= (param-rest-len) 0)
                (begin
                  (rfc822-read-headers iport) ; read trailer CRLF
                  (param-rest-len (eof-object)))
                (read-body! u8vec iport param-rest-len)))))))

(define (read-body! u8vec iport  param-rest-len)
  (let* ((len (min (u8vector-length u8vec)
                   (param-rest-len)))
         (read-len (read-block! u8vec iport 0 len)))
    (param-rest-len (- (param-rest-len) read-len))
    read-len))

(define (http-uri->socket uri)
  (receive (scheme _ host port _ _ _) (uri-parse uri)
    (make-client-socket 'inet host (if port (x->integer port) 80))))

(define (host:port->socket host:port)
  (cond
    ((#/([^:]+):(\d+)/ host:port)
     => (lambda (rxm) (make-client-socket (rxm 1) (x->integer (rxm 2)))))
    (else (make-client-socket host:port 80))))

(define-class <http-response-preface> ()
  ((status :init-keyword :status)
   (headers :init-keyword :headers)
   (content-uri :init-keyword :content-uri)
   (previous :init-keyword :previous)
   (used-method :init-keyword :used-method)))

(define (uri-history response-preface)
  (let loop ((preface response-preface)
             (uris '()))
    (if (not preface)
      uris
      (loop (ref preface 'previous)
            (cons (ref preface 'content-uri) uris)))))

(define (host-port=? uri1 uri2)
  (define (port-number scheme port)
    (cond
      (port)
      ((string=? scheme "http") 80)
      ((string=? scheme "https") 443)
      (else (error "host-port=?: unsupport scheme ~a" scheme))))
  (receive (scheme1 _ host1 port1 . _) (uri-parse uri1)
    (receive (scheme2 _ host2 port2 . _) (uri-parse uri2)
      (and
        (string=? scheme1 scheme2)
        (string=? host1 host2)
        (= (port-number scheme1 port1)
           (port-number scheme2 port2))))))

(define (keep-alive? headers)
  (or
    (and-let* ((pair (assoc "connection" headers)))
      (string-ci=? (cadr pair) "Keep-Alive"))
    (and-let* ((pair (assoc "proxy-connection" headers)))
      (string-ci=? (cadr pair) "Keep-Alive"))))

(define (http-send-request
          uri :key (no-redirect #f)
          (socket #f) (method 'GET) (headers '()) (body #f) (proxy #f))
  (define (check-looping uri init-preface)
    (let loop ((n 1)
               (preface init-preface))
      (cond
        ((not preface) #t)
        ((or (> n 10) (string=? uri (ref preface 'content-uri)))
         (errorf "~looping? ~a; visited URIs:~a"
                 uri (uri-history init-preface)))
        (else (loop (+ n 1) (ref preface 'previous))))))

  (define (redirect? no-redirect response-preface)
    (define (redirect-status? status) (#/3../ status))
    (and
      (not no-redirect)
      (redirect-status? (ref response-preface 'status))
      (get-location response-preface)))

  (define (get-location response-preface)
    (and-let* ((pair (assoc "location" (ref response-preface 'headers))))
      (cadr pair)))

  (define request-headers headers)
  (let try ((uri uri)
            (prev-socket socket)
            (prev-preface #f))
    (check-looping uri prev-preface)
    ;; RFC違反(RFC 2616 sec 10.3.3)だけど安全側に振って
    ;; リダイレクトした後はPOSTメソッドを使用しない
    (let1 method (if (and prev-preface (eq? method 'POST))
                   'GET
                   method)
      (receive (resp-preface socket)
        (http-send-request-no-redirect
          uri :prev-preface prev-preface :socket prev-socket :method method
          :headers request-headers :body body :proxy proxy)
        (if (not (redirect? no-redirect resp-preface))
          (values resp-preface socket)
          (let* ((location-uri (http-uri-join uri (get-location resp-preface)))
                 (next-socket (cond
                                ((and
                                   (keep-alive? (ref resp-preface 'headers))
                                   (host-port=? uri location-uri)
                                   ;; HEADメソッドに対してbodyを返してくる
                                   ;; サーバがあるので、HEADメソッド使用時は
                                   ;; Keep-Aliveであっても切断する
                                   (not (eq? method 'HEAD)))
                                 (throw-away-body socket resp-preface)
                                 socket)
                                (else (socket-close socket) #f))))
            (try location-uri next-socket resp-preface )))))))

(define (http-send-request-no-redirect
          uri :key (prev-preface #f)
          (socket #f) (method 'GET) (headers '()) (body #f) (proxy #f))
  (define request-headers headers)
  (let* ((socket (cond
                   (socket socket)
                   (proxy (host:port->socket proxy))
                   (else (http-uri->socket uri))))
         (sock-in (socket-input-port socket))
         (sock-out (socket-output-port socket)))
    (http-write-request sock-out uri
                        :method method :headers headers
                        :body body :proxy proxy)
    (receive (status response-headers) (http-read-status&headers sock-in)
      (values (make <http-response-preface>
                    :status status
                    :headers response-headers
                    :content-uri uri
                    :previous prev-preface
                    :used-method method)
              socket))))

(define (http-read-status&headers iport)
  (let* ((status (http-read-status-line iport))
         (headers (rfc822-read-headers iport)))
    (values status headers)))

(define (throw-away-body socket response-preface)
  (let1 http-in (open-http-input-port socket response-preface)
    (throw-away-input-port-content http-in)))

(define (throw-away-input-port-content iport)
  (call-with-output-file
    "/dev/null"
    (lambda (oport)
      (copy-port iport oport))))

(define (call-with-http-input-port response-preface socket proc
                                   :key (socket-close? #t))
  (guard (e (else (socket-close socket) (raise e)))
    (let* ((http-in (open-http-input-port socket response-preface))
           (result (proc http-in)))
      (if (or socket-close?
            (not (keep-alive? (ref response-preface 'headers))))
        (socket-close socket)
        (throw-away-input-port-content http-in))
      result)))

;;; rfc.http 互換
(use rfc.http :only (http-compose-query))
(define (http-get server request-uri . opts)
  (apply gauche-http-generic 'GET server request-uri #f opts))
(define (http-head server request-uri . opts)
  (apply gauche-http-generic 'HEAD server request-uri #f opts))
(define (http-post server request-uri body . opts)
  (apply gauche-http-generic 'POST server request-uri body opts))

(define (gauche-http-generic
          method server request-uri body :key (no-redirect #f) (proxy #f)
          (sink (open-output-string))
          (flusher (lambda (sink _) (get-output-string sink)))
          :allow-other-keys kvs)

  (let* ((request-uri (cond
                        ((string? request-uri) request-uri)
                        ((list request-uri)
                         (http-compose-query (car request-uri)
                                             (cdr request-uri)))))
         (abs-uri #`"http://,|server|,|request-uri|"))
    (receive (resp-preface socket) (http-send-request
                                     abs-uri :method method :body body
                                     :proxy proxy :no-redirect no-redirect
                                     :headers (apply kvs->headers kvs))
      (let* ((headers (ref resp-preface 'headers))
             (body (call-with-http-input-port
                     resp-preface socket
                     (lambda (http-in)
                       (copy-port http-in sink)
                       (flusher sink headers)))))
        (values (ref resp-preface 'status)
                headers
                (and (not (eq? method 'HEAD)) body))))))