

Meu trabalho de conclusão de curso foi sobre um sistema de comunicação de bibliotecas. Desenvolvi o trabalho inicialmente nas aulas de XP e mais tarde, junto com o co-autor da monografia, Leandro Bororo.
Todo o trabalho gira em torno de dois problemas: exportar os dados da base para um formato especial, denominado MARC, e importar os dados desse formato para o banco. Podemos separar isso na discussão legal e na discussão chata. A discussão chata é justamente os objetivos básicos do projeto supracitados. A discussão legal é como criar uma interface WEB para esse componente que permita o acompanhamento do processo de importação ou exportação. Obviamente vou falar sobre o segundo caso.
Desenvolver uma interface WEB para o MARC não é nenhum problema. No entanto, acompanhar a importação/exportação e permitir que o sistema trabalhe com arquivos grandes (mais de 10 megabytes) é outra história. A própria natureza de uma aplicação WEB torna a solução síncrona problemática. A análise de um arquivo MARC pode demorar bastante. O suficiente para a conexão HTTP (cliente-servidor) finalizar por timeout. Além disso, não seria a implementação mais responsiva (se todas as informações são direcionadas para a saída padrão, essas serão exibidas com sorte na linha de comando ou em algum arquivo de registro do servidor, em todo caso, não visível para o cliente). Para resolver isso, o tratamento da informação deveria ser assíncrono.
Inicialmente, poderíamos pensar em enviar as mensagens diretamente para o browser. Mas isso não é possível fazer diretamente. De fato, é o navegador faz as solicitações (request) de um certo recurso (disponível numa localização específica - URL). Uma alternativa seria o uso de Ajax (Asynchronous Javascript And XML) que daria a sensação de que o browser é capaz receber notificações do servidor. Na verdade, no Ajax, é o navegador que faz chamadas remotas de tempos em tempos (sem que o usuário tome alguma ação específica). O fato de a primeira requisição bloquear.
Abaixo, segue o funcionamento da comunicação síncrona entre Browser e servidor que mostra porque o browser irá bloquear ao enviar o arquivo para processamento:
1 2 3 4 5 6 7 8 9 10 | public Response processar (Request request) { // magia negra para pegar arquivo do request File file = pegaArquivo(request); // magia negra para processar o arquivo Resposta resposta = processa(arquivo); // magia para transformar resposta em response return transforma(resposta); } |
A conversa do navegador com o servidor é feito por ciclos de requisições e respostas. Se for necessário que numa requisição o servidor processe um dado arquivo, é preciso esperar todo o processamento para se obter a resposta do servidor. Pode-se separar o processamento da submissão do arquivo, delegando a responsabilidade para outro processo. Exemplo:
1 2 3 4 5 6 7 8 | public Response processar (Request request) { // magia negra para pegar arquivo do request File file = pegaArquivo(request); // apenas grava o arquivo.. uma outra linha de execução processa o arquivo. grava(arquivo); return paginaSucesso(); } |
Se o processamento do arquivo é feito na mesma linha de execução da submissão, o cliente só terá uma resposta após o processamento do arquivo.
Para o servidor processar o arquivo de forma assíncrona é necessário que primeiro um "Servlet" processe e grava o arquivo num repositório temporário (seja ele uma fila JMS, um caixa de email, um diretório, etc) e um outro processo, de tempos em tempos, verifique o conteúdo desse repositório.
Esse outro processo pode atualizar um arquivo de log a medida que for necessário (por exmplo: a medida que os registros Marcs forem processados) . Dessa forma, o cliente poderá "requisitar" o conteúdo do arquivo quantas vezes quiser.
1 2 3 4 5 6 7 | // isso é fácil... public Resposta processar(Request request) { descobreIdArquivo(request); // conteúdo atual do arquivo String arquivo = leArquivo(idArquivo); return converteParaResponse(arquivo); } |
A síntese da solução é a seguinte: Num primeiro momento o usuário faria o "upload" do arquivo. Depois o usuário deveria listar o arquivo em importação e consultar os detalhes da importação. O mesmo pode ser feito para a exportação. Uma fila de mensagens assíncronas e um processo em execução paralela devem ser implementados.
Implementamos o processo da seguinte maneira:
O servidor recebe um arquivo, via FormFile (essa classe é um utilitário da biblioteca commons-upload [do projeto Commons da Apache Software Foundation] para tratar multipart/form-data de uma requisição HTTP) e o grava num diretório específico. Em seguida, envia um objeto comando como mensagem para uma fila (com o nome do arquivo já atribuído a uma de suas propriedades). Por outro lado, o sistema mantém uma reserva (pool) de trabalhadores que consomem os "trabalhos" enviados para a fila. Cada processamento é registrado por um registrador (logger) num mapa cuja chave é o nome do arquivo. Um Servlet controla o ciclo de vida desses trabalhadores. Os trabalhadores devem por isso, realizar pequenos blocos de operações e verificar se pode executar o próximo bloco, para evitar bloquear o servidor caso seja necessário reiniciá-lo. Abaixo, segue como foi implementado o trabalhador:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | @Override public void run() { try { while (condicao) { Command command = works.take(); if (NO_MORE_WORK == command) { break ; } try { while (command.hasWork() && condicao) { command.process(); } } catch (Exception e) { e.printStackTrace(); } command.finish(); } } catch (InterruptedException e) { } } |
Nesse trecho de código, o objeto works representa a fila de trabalhos (BlockingQueue do pacote utilitário de concorrência do Java 1.5 e superiores). Ele será o repositório da comunicação Produtor-Consumidor. Command é a classe que implementa o padrão Command [Gof], o comando que será executado. A variável condicao permite que o trabalhador verifique se deve continuar a execução do trabalho atual. Aqui a implementação do comando para a Importação do MARC:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | private class ImportMarcCommand extends Command { private LenientMarcStreamReader reader; private MarcManager marcManager; private FileInputStream inputStream; private String nomeArquivo; private FileChannel channel; @Override public boolean hasWork() { return reader.hasNext(); } @Override public void process() throws Exception { marcManager.parse(reader.next()); marcManager.getLog().setAvailable(channel.position()); } public ImportMarcCommand(String nomeDoArquivo, boolean sobrescreveItem, boolean sobrescreveExemplar) throws NamingException, IOException { marcManager = new MarcManager(sobrescreveItem, sobrescreveExemplar); Logger.getInstance().getLogs().add(marcManager.getLog()); this .nomeArquivo = nomeDoArquivo; File file = new File(nomeDoArquivo); inputStream = new FileInputStream(file); channel = inputStream.getChannel(); marcManager.getLog().setInitialAvailable(file.length()); reader = new LenientMarcStreamReader(inputStream); } @Override public void finish() { marcManager.getLog().setFinalDate( new Date()); // vou fechar o stream por educação. se eu não conseguir, não vou // ligar. if (inputStream != null ) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } // tenta apagar educadamente. File f = new File(nomeArquivo); if (!f.exists() && !f.canWrite()) { f.delete(); } } } |
Algumas observações:
O ImportMarcCommand processa um registro MARC por vez (então, o processo pode ser finalizado logo que o registro atual for processado).
O objeto reader é responsável por identificar o registro e mover o indicador da posição de leitura do arquivo.
O objeto channel permite que essa informação (sobre o indicador da posição seja computado) seja computada no cálculo do total decorrido.
O Logger é o repositório com as informações sobre o histórico de cada processamento (tempo inicial, final , etc).
No método finish, é registrado o fim do processamento e o arquivo é apagado.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | public class ThreadMonitorServlet extends HttpServlet { private static final long serialVersionUID = 1L; @Override public void destroy() { for ( int i = 0 ; i < workers.length; i++) { workers[i].setCondicao( false ); try { queue.put(ImportWorker.NO_MORE_WORK); } catch (InterruptedException e) { e.printStackTrace(); } } super .destroy(); } static final int capacity = 10 ; final int numWorkers = 3 ; public static BlockingQueue<command></command> queue = new ArrayBlockingQueue<command></command>( capacity); ImportWorker[] workers = new ImportWorker[numWorkers]; @Override public void init(ServletConfig servletConfig) throws ServletException { super .init(servletConfig); for ( int i = 0 ; i < workers.length; i++) { workers[i] = new ImportWorker(queue); workers[i].start(); } } } |
<servlet> <servlet-name>monitor</servlet-name> <servlet-class> br.usp.ime.colmeia.async.ThreadMonitorServlet </servlet-class> <load-on-startup>1</load-on-startup> </servlet>Depois disso, tem-se a ação para listar a situação do processamento de cada arquivo.
Comentários