30 сентября 2019

Delphi & Elasticsearch. Загрузка и индексирование файлов

    В статье "Полнотекстовый поиск по содержимому файлов" я рассказывал, что нужно сделать для загрузки файла на индексацию в Elasticsearch. Теперь подробно рассмотрим реализацию этого процесса на Delphi с использованием стандартных классов HttpClient и NetEncoding.
    Думаю, что будет более понятно если я включу описание реализации в качестве комментариев в текст процедуры индексации файла.
uses
  System.Classes,
  System.SysUtils,
  System.StrUtils,
  System.Hash,
  System.NetEncoding,
  System.Net.HttpClient,
  System.Net.HttpClientComponent;

const
  csBaseUrl = 'http://localhost:9200/';
...

// sIndexName - наименование индекса
// sFileName  - наименования файла
function esIndexing(const sIndexName, sFileName: String): Boolean;
var
  fs: TFileStream;
  iFileSize: Int64;
  baBuffer: TBytes;
  en: TEncoding;
  FileMD5: THashMD5;
  enBase64: TBase64Encoding;

  http: TNetHTTPClient;
  Response: IHTTPResponse;
  ss: TStringStream;
begin
  Result := False;
  try
    // открываем файловый поток
    fs := TFileStream.Create(sFileName, fmOpenRead + fmShareDenyWrite);
    try
      iFileSize := fs.Size; // размер файла
      if iFileSize = 0
        then Writeln('File is empty')
        else begin
          // читаем содержимое файла в буфер
          SetLength(baBuffer, iFileSize);
          fs.Read(baBuffer, iFileSize);

          // в качестве ID документа в Elasticsearch
          // будем использовать MD5 файла
          FileMD5 := THashMD5.Create;
          FileMD5.Update(baBuffer, iFileSize);

          // определяем кодировку файла
          en := nil;
          TEncoding.GetBufferEncoding(baBuffer, en);

          // содержимое файла нужно перекодировать в Base64.
          // т.к. TNetEncoding.Base64.EncodeBytesToString
          // разбивает результат на строки по 76 символов
          // создаем экземпляр класса TBase64Encoding сами
          // с параметром CharsPerLine = 0
          enBase64 := TBase64Encoding.Create(0);
          try
            // содержимое буфера конвертируем в строку
            // с полученной выше кодировкой, затем строку
            // перекодируем в Base64 и включаем в текст запроса
            ss := TStringStream.Create('{"data": "' +
                  enBase64.Encode(en.GetString(baBuffer)) + '"}');
            try
              http := TNetHTTPClient.Create(nil);
              try
                http.AcceptCharSet := 'UTF-8';
                http.ContentType := 'application/json';
                // посылаем PUT запрос по адресу состоящему из:
                // - адреса сервера Elasticsearch (csBaseUrl)
                // - наименования индекса (sIndexName)
                // - ID документа (MD5 файла)
                // - наименования конвейера ("attachment")
                Response := http.Put(csBaseUrl + sIndexName +
                                     '/_doc/' + FileMD5.HashAsString +
                                     '?pipeline=attachment', ss);
                // анализируем ответ сервера
                case Response.StatusCode of
                  201: begin // документ создан
                         Writeln('Created: ' + Response.ContentAsString);
                         Result := True;
                       end;
                  200: begin // документ обновлен
                         Writeln('Updated: ' + Response.ContentAsString);
                         Result := True;
                       End
                  // сервер вернул ошибку
                  else Writeln('Error:' +
                               ' StatusCode=' + Response.StatusCode.toString +
                               ' StatusText="' + Response.StatusText + '"' +
                               ' Content="' + Response.ContentAsString + '"');
                end;
              finally
                http.Free;
              end;
            finally
              ss.Free
            end;
          finally
           enBase64.Free
          end;
        end;
    finally
      SetLength(baBuffer, 0);
      fs.Free;
    end;
  except
   on E: Exception do
     Writeln(E.ClassName, ': ', E.Message);
  end;
end;

Рассмотрим примеры результатов работы функции:

1. Результат успешного создания документа:
{"_index":"files","_type":"_doc","_id":"c14067733508057b6ba1008156b29cd5","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":11,"_primary_term":1}
В содержимом ответа сервера мы видим, что в базе Elasticsearch в индексе "files" была успешно создана ("created") 1-я версия ("_version":1) документа с ID = c14067733508057b6ba1008156b29cd5 (MD5 моего тестового файла).

2. Результат успешного обновления документа:
{"_index":"files","_type":"_doc","_id":"c14067733508057b6ba1008156b29cd5","_version":2,"result":"updated","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":12,"_primary_term":1}
В содержимом ответа сервера мы видим, что в результате успешного обновления ("updated") документа с ID = c14067733508057b6ba1008156b29cd5 была создана его 2-я версия.

3. Результат ошибочного запроса
Error: StatusCode=400 StatusText="Bad Request" Content="{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"pipeline with id [attachment2] does not exist"}],"type":"illegal_argument_exception","reason":"pipeline with id [attachment2] does not exist"},"status":400}"
В содержимом ответа сервера мы видим, что в запросе указано имя не существующего конвейера "attachment2".

    При массовой индексации файлов создание объекта TNetHTTPClient лучше вынести за пределы функции и создавать его перед началом индексации. Данная функция не требует установки в Delphi сторонних библиотек и успешно работает как под Windows, так и под Linux.

Комментариев нет:

Отправить комментарий